You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2017/12/03 05:11:13 UTC

[1/2] kylin git commit: KYLIN-3060 The logical processing of creating or updating streaming table has a bug in server, which will cause a NullPointerException.

Repository: kylin
Updated Branches:
  refs/heads/master 84779827a -> c5d1fd07f


KYLIN-3060 The logical processing of creating or updating streaming table has a bug in server, which will cause a NullPointerException.

Signed-off-by: Li Yang <li...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/e53257d1
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/e53257d1
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/e53257d1

Branch: refs/heads/master
Commit: e53257d1fad0edc43cc625b4e3bc63723e403f4e
Parents: 8477982
Author: peng.jianhua <pe...@zte.com.cn>
Authored: Fri Nov 24 21:16:28 2017 +0800
Committer: Li Yang <li...@apache.org>
Committed: Sun Dec 3 13:06:10 2017 +0800

----------------------------------------------------------------------
 .../kylin/rest/controller/StreamingController.java   | 15 +++++++++++++++
 1 file changed, 15 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/e53257d1/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
index 593abea..bd396be 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
@@ -114,7 +114,13 @@ public class StreamingController extends BasicController {
         }
 
         StreamingConfig streamingConfig = deserializeSchemalDesc(streamingRequest);
+        if(!streamingRequest.isSuccessful()){
+            return streamingRequest;
+        }
         KafkaConfig kafkaConfig = deserializeKafkaSchemalDesc(streamingRequest);
+        if(!streamingRequest.isSuccessful()){
+            return streamingRequest;
+        }
         boolean saveStreamingSuccess = false, saveKafkaSuccess = false;
 
         try {
@@ -181,7 +187,13 @@ public class StreamingController extends BasicController {
     @ResponseBody
     public StreamingRequest updateStreamingConfig(@RequestBody StreamingRequest streamingRequest) throws JsonProcessingException {
         StreamingConfig streamingConfig = deserializeSchemalDesc(streamingRequest);
+        if(!streamingRequest.isSuccessful()){
+            return streamingRequest;
+        }
         KafkaConfig kafkaConfig = deserializeKafkaSchemalDesc(streamingRequest);
+        if(!streamingRequest.isSuccessful()){
+            return streamingRequest;
+        }
         String project = streamingRequest.getProject();
         if (streamingConfig == null) {
             return streamingRequest;
@@ -230,6 +242,7 @@ public class StreamingController extends BasicController {
         try {
             logger.debug("Saving TableDesc " + streamingRequest.getTableData());
             desc = JsonUtil.readValue(streamingRequest.getTableData(), TableDesc.class);
+            updateRequest(streamingRequest, true, null);
         } catch (JsonParseException e) {
             logger.error("The TableDesc definition is invalid.", e);
             updateRequest(streamingRequest, false, e.getMessage());
@@ -255,6 +268,7 @@ public class StreamingController extends BasicController {
         try {
             logger.debug("Saving StreamingConfig " + streamingRequest.getStreamingConfig());
             desc = JsonUtil.readValue(streamingRequest.getStreamingConfig(), StreamingConfig.class);
+            updateRequest(streamingRequest, true, null);
         } catch (JsonParseException e) {
             logger.error("The StreamingConfig definition is invalid.", e);
             updateRequest(streamingRequest, false, e.getMessage());
@@ -273,6 +287,7 @@ public class StreamingController extends BasicController {
         try {
             logger.debug("Saving KafkaConfig " + streamingRequest.getKafkaConfig());
             desc = JsonUtil.readValue(streamingRequest.getKafkaConfig(), KafkaConfig.class);
+            updateRequest(streamingRequest, true, null);
         } catch (JsonParseException e) {
             logger.error("The KafkaConfig definition is invalid.", e);
             updateRequest(streamingRequest, false, e.getMessage());


[2/2] kylin git commit: KYLIN-3060 code format

Posted by li...@apache.org.
KYLIN-3060 code format


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/c5d1fd07
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/c5d1fd07
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/c5d1fd07

Branch: refs/heads/master
Commit: c5d1fd07f5cb16978b1cc626edf556c0a2be4dcf
Parents: e53257d
Author: Li Yang <li...@apache.org>
Authored: Sun Dec 3 13:11:03 2017 +0800
Committer: Li Yang <li...@apache.org>
Committed: Sun Dec 3 13:11:03 2017 +0800

----------------------------------------------------------------------
 .../rest/controller/StreamingController.java    | 39 ++++++++++++++------
 1 file changed, 27 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/c5d1fd07/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
index bd396be..ccd489d 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
@@ -77,7 +77,10 @@ public class StreamingController extends BasicController {
 
     @RequestMapping(value = "/getConfig", method = { RequestMethod.GET }, produces = { "application/json" })
     @ResponseBody
-    public List<StreamingConfig> getStreamings(@RequestParam(value = "table", required = false) String table, @RequestParam(value = "project", required = false) String project, @RequestParam(value = "limit", required = false) Integer limit, @RequestParam(value = "offset", required = false) Integer offset) {
+    public List<StreamingConfig> getStreamings(@RequestParam(value = "table", required = false) String table,
+            @RequestParam(value = "project", required = false) String project,
+            @RequestParam(value = "limit", required = false) Integer limit,
+            @RequestParam(value = "offset", required = false) Integer offset) {
         try {
             return streamingService.getStreamingConfigs(table, project, limit, offset);
         } catch (IOException e) {
@@ -89,7 +92,11 @@ public class StreamingController extends BasicController {
 
     @RequestMapping(value = "/getKfkConfig", method = { RequestMethod.GET }, produces = { "application/json" })
     @ResponseBody
-    public List<KafkaConfig> getKafkaConfigs(@RequestParam(value = "kafkaConfigName", required = false) String kafkaConfigName, @RequestParam(value = "project", required = false) String project, @RequestParam(value = "limit", required = false) Integer limit, @RequestParam(value = "offset", required = false) Integer offset) {
+    public List<KafkaConfig> getKafkaConfigs(
+            @RequestParam(value = "kafkaConfigName", required = false) String kafkaConfigName,
+            @RequestParam(value = "project", required = false) String project,
+            @RequestParam(value = "limit", required = false) Integer limit,
+            @RequestParam(value = "offset", required = false) Integer offset) {
         try {
             return kafkaConfigService.getKafkaConfigs(kafkaConfigName, project, limit, offset);
         } catch (IOException e) {
@@ -114,11 +121,11 @@ public class StreamingController extends BasicController {
         }
 
         StreamingConfig streamingConfig = deserializeSchemalDesc(streamingRequest);
-        if(!streamingRequest.isSuccessful()){
+        if (!streamingRequest.isSuccessful()) {
             return streamingRequest;
         }
         KafkaConfig kafkaConfig = deserializeKafkaSchemalDesc(streamingRequest);
-        if(!streamingRequest.isSuccessful()){
+        if (!streamingRequest.isSuccessful()) {
             return streamingRequest;
         }
         boolean saveStreamingSuccess = false, saveKafkaSuccess = false;
@@ -152,7 +159,8 @@ public class StreamingController extends BasicController {
                 try {
                     streamingService.dropStreamingConfig(streamingConfig, project);
                 } catch (IOException e1) {
-                    throw new InternalErrorException("StreamingConfig is created, but failed to create KafkaConfig: " + e.getLocalizedMessage());
+                    throw new InternalErrorException(
+                            "StreamingConfig is created, but failed to create KafkaConfig: " + e.getLocalizedMessage());
                 }
                 logger.error("Failed to save KafkaConfig:" + e.getLocalizedMessage(), e);
                 throw new InternalErrorException("Failed to save KafkaConfig: " + e.getLocalizedMessage());
@@ -161,11 +169,14 @@ public class StreamingController extends BasicController {
             if (saveKafkaSuccess == false || saveStreamingSuccess == false) {
 
                 if (saveStreamingSuccess == true) {
-                    StreamingConfig sConfig = streamingService.getStreamingManager().getStreamingConfig(streamingConfig.getName());
+                    StreamingConfig sConfig = streamingService.getStreamingManager()
+                            .getStreamingConfig(streamingConfig.getName());
                     try {
                         streamingService.dropStreamingConfig(sConfig, project);
                     } catch (IOException e) {
-                        throw new InternalErrorException("Action failed and failed to rollback the created streaming config: " + e.getLocalizedMessage());
+                        throw new InternalErrorException(
+                                "Action failed and failed to rollback the created streaming config: "
+                                        + e.getLocalizedMessage());
                     }
                 }
                 if (saveKafkaSuccess == true) {
@@ -173,7 +184,9 @@ public class StreamingController extends BasicController {
                         KafkaConfig kConfig = kafkaConfigService.getKafkaConfig(kafkaConfig.getName(), project);
                         kafkaConfigService.dropKafkaConfig(kConfig, project);
                     } catch (IOException e) {
-                        throw new InternalErrorException("Action failed and failed to rollback the created kafka config: " + e.getLocalizedMessage());
+                        throw new InternalErrorException(
+                                "Action failed and failed to rollback the created kafka config: "
+                                        + e.getLocalizedMessage());
                     }
                 }
             }
@@ -185,13 +198,14 @@ public class StreamingController extends BasicController {
 
     @RequestMapping(value = "", method = { RequestMethod.PUT }, produces = { "application/json" })
     @ResponseBody
-    public StreamingRequest updateStreamingConfig(@RequestBody StreamingRequest streamingRequest) throws JsonProcessingException {
+    public StreamingRequest updateStreamingConfig(@RequestBody StreamingRequest streamingRequest)
+            throws JsonProcessingException {
         StreamingConfig streamingConfig = deserializeSchemalDesc(streamingRequest);
-        if(!streamingRequest.isSuccessful()){
+        if (!streamingRequest.isSuccessful()) {
             return streamingRequest;
         }
         KafkaConfig kafkaConfig = deserializeKafkaSchemalDesc(streamingRequest);
-        if(!streamingRequest.isSuccessful()){
+        if (!streamingRequest.isSuccessful()) {
             return streamingRequest;
         }
         String project = streamingRequest.getProject();
@@ -220,7 +234,8 @@ public class StreamingController extends BasicController {
         return streamingRequest;
     }
 
-    @RequestMapping(value = "/{project}/{configName}", method = { RequestMethod.DELETE }, produces = { "application/json" })
+    @RequestMapping(value = "/{project}/{configName}", method = { RequestMethod.DELETE }, produces = {
+            "application/json" })
     @ResponseBody
     public void deleteConfig(@PathVariable String project, @PathVariable String configName) throws IOException {
         StreamingConfig config = streamingService.getStreamingManager().getStreamingConfig(configName);