You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2017/12/03 05:11:13 UTC
[1/2] kylin git commit: KYLIN-3060 The logical processing of creating
or updating streaming table has a bug in server,
which will cause a NullPointerException.
Repository: kylin
Updated Branches:
refs/heads/master 84779827a -> c5d1fd07f
KYLIN-3060 The logical processing of creating or updating streaming table has a bug in server, which will cause a NullPointerException.
Signed-off-by: Li Yang <li...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/e53257d1
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/e53257d1
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/e53257d1
Branch: refs/heads/master
Commit: e53257d1fad0edc43cc625b4e3bc63723e403f4e
Parents: 8477982
Author: peng.jianhua <pe...@zte.com.cn>
Authored: Fri Nov 24 21:16:28 2017 +0800
Committer: Li Yang <li...@apache.org>
Committed: Sun Dec 3 13:06:10 2017 +0800
----------------------------------------------------------------------
.../kylin/rest/controller/StreamingController.java | 15 +++++++++++++++
1 file changed, 15 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/e53257d1/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
index 593abea..bd396be 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
@@ -114,7 +114,13 @@ public class StreamingController extends BasicController {
}
StreamingConfig streamingConfig = deserializeSchemalDesc(streamingRequest);
+ if(!streamingRequest.isSuccessful()){
+ return streamingRequest;
+ }
KafkaConfig kafkaConfig = deserializeKafkaSchemalDesc(streamingRequest);
+ if(!streamingRequest.isSuccessful()){
+ return streamingRequest;
+ }
boolean saveStreamingSuccess = false, saveKafkaSuccess = false;
try {
@@ -181,7 +187,13 @@ public class StreamingController extends BasicController {
@ResponseBody
public StreamingRequest updateStreamingConfig(@RequestBody StreamingRequest streamingRequest) throws JsonProcessingException {
StreamingConfig streamingConfig = deserializeSchemalDesc(streamingRequest);
+ if(!streamingRequest.isSuccessful()){
+ return streamingRequest;
+ }
KafkaConfig kafkaConfig = deserializeKafkaSchemalDesc(streamingRequest);
+ if(!streamingRequest.isSuccessful()){
+ return streamingRequest;
+ }
String project = streamingRequest.getProject();
if (streamingConfig == null) {
return streamingRequest;
@@ -230,6 +242,7 @@ public class StreamingController extends BasicController {
try {
logger.debug("Saving TableDesc " + streamingRequest.getTableData());
desc = JsonUtil.readValue(streamingRequest.getTableData(), TableDesc.class);
+ updateRequest(streamingRequest, true, null);
} catch (JsonParseException e) {
logger.error("The TableDesc definition is invalid.", e);
updateRequest(streamingRequest, false, e.getMessage());
@@ -255,6 +268,7 @@ public class StreamingController extends BasicController {
try {
logger.debug("Saving StreamingConfig " + streamingRequest.getStreamingConfig());
desc = JsonUtil.readValue(streamingRequest.getStreamingConfig(), StreamingConfig.class);
+ updateRequest(streamingRequest, true, null);
} catch (JsonParseException e) {
logger.error("The StreamingConfig definition is invalid.", e);
updateRequest(streamingRequest, false, e.getMessage());
@@ -273,6 +287,7 @@ public class StreamingController extends BasicController {
try {
logger.debug("Saving KafkaConfig " + streamingRequest.getKafkaConfig());
desc = JsonUtil.readValue(streamingRequest.getKafkaConfig(), KafkaConfig.class);
+ updateRequest(streamingRequest, true, null);
} catch (JsonParseException e) {
logger.error("The KafkaConfig definition is invalid.", e);
updateRequest(streamingRequest, false, e.getMessage());
[2/2] kylin git commit: KYLIN-3060 code format
Posted by li...@apache.org.
KYLIN-3060 code format
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/c5d1fd07
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/c5d1fd07
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/c5d1fd07
Branch: refs/heads/master
Commit: c5d1fd07f5cb16978b1cc626edf556c0a2be4dcf
Parents: e53257d
Author: Li Yang <li...@apache.org>
Authored: Sun Dec 3 13:11:03 2017 +0800
Committer: Li Yang <li...@apache.org>
Committed: Sun Dec 3 13:11:03 2017 +0800
----------------------------------------------------------------------
.../rest/controller/StreamingController.java | 39 ++++++++++++++------
1 file changed, 27 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/c5d1fd07/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
index bd396be..ccd489d 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
@@ -77,7 +77,10 @@ public class StreamingController extends BasicController {
@RequestMapping(value = "/getConfig", method = { RequestMethod.GET }, produces = { "application/json" })
@ResponseBody
- public List<StreamingConfig> getStreamings(@RequestParam(value = "table", required = false) String table, @RequestParam(value = "project", required = false) String project, @RequestParam(value = "limit", required = false) Integer limit, @RequestParam(value = "offset", required = false) Integer offset) {
+ public List<StreamingConfig> getStreamings(@RequestParam(value = "table", required = false) String table,
+ @RequestParam(value = "project", required = false) String project,
+ @RequestParam(value = "limit", required = false) Integer limit,
+ @RequestParam(value = "offset", required = false) Integer offset) {
try {
return streamingService.getStreamingConfigs(table, project, limit, offset);
} catch (IOException e) {
@@ -89,7 +92,11 @@ public class StreamingController extends BasicController {
@RequestMapping(value = "/getKfkConfig", method = { RequestMethod.GET }, produces = { "application/json" })
@ResponseBody
- public List<KafkaConfig> getKafkaConfigs(@RequestParam(value = "kafkaConfigName", required = false) String kafkaConfigName, @RequestParam(value = "project", required = false) String project, @RequestParam(value = "limit", required = false) Integer limit, @RequestParam(value = "offset", required = false) Integer offset) {
+ public List<KafkaConfig> getKafkaConfigs(
+ @RequestParam(value = "kafkaConfigName", required = false) String kafkaConfigName,
+ @RequestParam(value = "project", required = false) String project,
+ @RequestParam(value = "limit", required = false) Integer limit,
+ @RequestParam(value = "offset", required = false) Integer offset) {
try {
return kafkaConfigService.getKafkaConfigs(kafkaConfigName, project, limit, offset);
} catch (IOException e) {
@@ -114,11 +121,11 @@ public class StreamingController extends BasicController {
}
StreamingConfig streamingConfig = deserializeSchemalDesc(streamingRequest);
- if(!streamingRequest.isSuccessful()){
+ if (!streamingRequest.isSuccessful()) {
return streamingRequest;
}
KafkaConfig kafkaConfig = deserializeKafkaSchemalDesc(streamingRequest);
- if(!streamingRequest.isSuccessful()){
+ if (!streamingRequest.isSuccessful()) {
return streamingRequest;
}
boolean saveStreamingSuccess = false, saveKafkaSuccess = false;
@@ -152,7 +159,8 @@ public class StreamingController extends BasicController {
try {
streamingService.dropStreamingConfig(streamingConfig, project);
} catch (IOException e1) {
- throw new InternalErrorException("StreamingConfig is created, but failed to create KafkaConfig: " + e.getLocalizedMessage());
+ throw new InternalErrorException(
+ "StreamingConfig is created, but failed to create KafkaConfig: " + e.getLocalizedMessage());
}
logger.error("Failed to save KafkaConfig:" + e.getLocalizedMessage(), e);
throw new InternalErrorException("Failed to save KafkaConfig: " + e.getLocalizedMessage());
@@ -161,11 +169,14 @@ public class StreamingController extends BasicController {
if (saveKafkaSuccess == false || saveStreamingSuccess == false) {
if (saveStreamingSuccess == true) {
- StreamingConfig sConfig = streamingService.getStreamingManager().getStreamingConfig(streamingConfig.getName());
+ StreamingConfig sConfig = streamingService.getStreamingManager()
+ .getStreamingConfig(streamingConfig.getName());
try {
streamingService.dropStreamingConfig(sConfig, project);
} catch (IOException e) {
- throw new InternalErrorException("Action failed and failed to rollback the created streaming config: " + e.getLocalizedMessage());
+ throw new InternalErrorException(
+ "Action failed and failed to rollback the created streaming config: "
+ + e.getLocalizedMessage());
}
}
if (saveKafkaSuccess == true) {
@@ -173,7 +184,9 @@ public class StreamingController extends BasicController {
KafkaConfig kConfig = kafkaConfigService.getKafkaConfig(kafkaConfig.getName(), project);
kafkaConfigService.dropKafkaConfig(kConfig, project);
} catch (IOException e) {
- throw new InternalErrorException("Action failed and failed to rollback the created kafka config: " + e.getLocalizedMessage());
+ throw new InternalErrorException(
+ "Action failed and failed to rollback the created kafka config: "
+ + e.getLocalizedMessage());
}
}
}
@@ -185,13 +198,14 @@ public class StreamingController extends BasicController {
@RequestMapping(value = "", method = { RequestMethod.PUT }, produces = { "application/json" })
@ResponseBody
- public StreamingRequest updateStreamingConfig(@RequestBody StreamingRequest streamingRequest) throws JsonProcessingException {
+ public StreamingRequest updateStreamingConfig(@RequestBody StreamingRequest streamingRequest)
+ throws JsonProcessingException {
StreamingConfig streamingConfig = deserializeSchemalDesc(streamingRequest);
- if(!streamingRequest.isSuccessful()){
+ if (!streamingRequest.isSuccessful()) {
return streamingRequest;
}
KafkaConfig kafkaConfig = deserializeKafkaSchemalDesc(streamingRequest);
- if(!streamingRequest.isSuccessful()){
+ if (!streamingRequest.isSuccessful()) {
return streamingRequest;
}
String project = streamingRequest.getProject();
@@ -220,7 +234,8 @@ public class StreamingController extends BasicController {
return streamingRequest;
}
- @RequestMapping(value = "/{project}/{configName}", method = { RequestMethod.DELETE }, produces = { "application/json" })
+ @RequestMapping(value = "/{project}/{configName}", method = { RequestMethod.DELETE }, produces = {
+ "application/json" })
@ResponseBody
public void deleteConfig(@PathVariable String project, @PathVariable String configName) throws IOException {
StreamingConfig config = streamingService.getStreamingManager().getStreamingConfig(configName);