You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2020/09/27 10:33:33 UTC

[incubator-tubemq] branch TUBEMQ-336 updated: [TUBEMQ-364] uniform response format for exception state (#278)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch TUBEMQ-336
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git


The following commit(s) were added to refs/heads/TUBEMQ-336 by this push:
     new 1c177a7  [TUBEMQ-364] uniform response format for exception state (#278)
1c177a7 is described below

commit 1c177a7edee6d7506e961ab5b32064c595828c70
Author: Yuanbo Liu <yu...@apache.org>
AuthorDate: Sun Sep 27 18:33:26 2020 +0800

    [TUBEMQ-364] uniform response format for exception state (#278)
---
 .../org/apache/tubemq/manager/TubeMQManager.java   | 44 ++++++----
 .../tubemq/manager/backend/AbstractDaemon.java     | 97 ----------------------
 .../controller/ManagerControllerAdvice.java        | 46 ++++++++++
 .../{ => business}/BusinessController.java         | 45 ++++++----
 .../controller/{ => business}/BusinessResult.java  |  6 +-
 .../apache/tubemq/manager/entry/BusinessEntry.java |  7 +-
 .../TubeMQManagerException.java}                   | 23 ++---
 .../AsyncService.java}                             | 33 ++------
 .../manager/controller/TestBusinessController.java | 17 +++-
 9 files changed, 142 insertions(+), 176 deletions(-)

diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/TubeMQManager.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/TubeMQManager.java
index 5df581d..a25897b 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/TubeMQManager.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/TubeMQManager.java
@@ -16,30 +16,44 @@
  */
 package org.apache.tubemq.manager;
 
-import org.apache.tubemq.manager.backend.AbstractDaemon;
+import java.util.concurrent.Executor;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.Bean;
 import org.springframework.data.jpa.repository.config.EnableJpaAuditing;
+import org.springframework.scheduling.annotation.EnableAsync;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 
 @SpringBootApplication
 @EnableJpaAuditing
-public class TubeMQManager extends AbstractDaemon {
-    public static void main(String[] args) throws Exception {
-        TubeMQManager manager = new TubeMQManager();
-        manager.startThreads();
-        SpringApplication.run(TubeMQManager.class);
-        // web application stopped, then stop working threads.
-        manager.stopThreads();
-        manager.join();
-    }
+@EnableAsync
+public class TubeMQManager {
 
-    @Override
-    public void startThreads() throws Exception {
+    @Value("${manager.async.core.pool.size:2}")
+    private int asyncCorePoolSize;
 
-    }
+    @Value("${manager.async.max.pool.size:20}")
+    private int asyncMaxPoolSize;
+
+    @Value("${manager.async.queue.capacity:100}")
+    private int asyncQueueCapacity;
 
-    @Override
-    public void stopThreads() throws Exception {
+    @Value("${manager.async.thread.prefix:AsyncThread-}")
+    private String threadPrefix;
+
+    public static void main(String[] args) throws Exception {
+        SpringApplication.run(TubeMQManager.class);
+    }
 
+    @Bean(name = "asyncExecutor")
+    public Executor asyncExecutor() {
+        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
+        executor.setCorePoolSize(asyncCorePoolSize);
+        executor.setMaxPoolSize(asyncMaxPoolSize);
+        executor.setQueueCapacity(asyncQueueCapacity);
+        executor.setThreadNamePrefix(threadPrefix);
+        executor.initialize();
+        return executor;
     }
 }
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/backend/AbstractDaemon.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/backend/AbstractDaemon.java
deleted file mode 100644
index 2db9318..0000000
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/backend/AbstractDaemon.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tubemq.manager.backend;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Abstract daemon with a batch of working thread.
- */
-public abstract class AbstractDaemon implements ThreadStartAndStop {
-    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractDaemon.class);
-
-    // worker thread pool
-    private final ExecutorService workerServices;
-    private final List<CompletableFuture<?>> workerFutures;
-    private boolean runnable = true;
-
-    public AbstractDaemon() {
-        this.workerServices = Executors
-                .newCachedThreadPool(new TubeMQManagerFactory(this.getClass().getSimpleName()));
-        this.workerFutures = new ArrayList<>();
-    }
-
-    /**
-     * Whether threads can in running state with while loop.
-     *
-     * @return - true if threads can run
-     */
-    public boolean isRunnable() {
-        return runnable;
-    }
-
-    /**
-     * Stop running threads.
-     */
-    public void stopRunningThreads() {
-        runnable = false;
-    }
-
-    /**
-     * Submit work thread to thread pool.
-     *
-     * @param worker - work thread
-     */
-    public void submitWorker(Runnable worker) {
-        CompletableFuture<?> future = CompletableFuture.runAsync(worker, this.workerServices);
-        workerFutures.add(future);
-        LOGGER.info("{} running worker number is {}", this.getClass().getName(),
-                workerFutures.size());
-    }
-
-    /**
-     * Wait for threads finish.
-     */
-    public void join() {
-        for (CompletableFuture<?> future : workerFutures) {
-            future.join();
-        }
-    }
-
-    /**
-     * Stop thread pool and running threads if they're in the running state.
-     *
-     * @param timeout - max wait time
-     * @param timeUnit - time unit
-     */
-    public void waitForTerminate(long timeout, TimeUnit timeUnit) throws Exception {
-        // stopping working threads.
-        if (isRunnable()) {
-            stopRunningThreads();
-            workerServices.shutdown();
-            workerServices.awaitTermination(timeout, timeUnit);
-        }
-    }
-}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/ManagerControllerAdvice.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/ManagerControllerAdvice.java
new file mode 100644
index 0000000..33369ca
--- /dev/null
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/ManagerControllerAdvice.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.manager.controller;
+import javax.servlet.http.HttpServletRequest;
+import org.apache.tubemq.manager.controller.business.BusinessResult;
+import org.apache.tubemq.manager.exceptions.TubeMQManagerException;
+import org.springframework.web.bind.annotation.ExceptionHandler;
+import org.springframework.web.bind.annotation.RestControllerAdvice;
+
+/**
+ * Controller advice for handling exceptions
+ */
+@RestControllerAdvice
+public class ManagerControllerAdvice {
+
+    /**
+     * handling business TubeMQManagerException, and return json format string.
+     *
+     * @param request - http request
+     * @param ex - exception
+     * @return entity
+     */
+    @ExceptionHandler(TubeMQManagerException.class)
+    public BusinessResult handlingBusinessException(HttpServletRequest request,
+            TubeMQManagerException ex) {
+        BusinessResult result = new BusinessResult();
+        result.setMessage(ex.getMessage());
+        result.setCode(-1);
+        return result;
+    }
+}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/BusinessController.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/business/BusinessController.java
similarity index 68%
rename from tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/BusinessController.java
rename to tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/business/BusinessController.java
index 934c215..c8190a8 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/BusinessController.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/business/BusinessController.java
@@ -14,14 +14,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.tubemq.manager.controller;
+package org.apache.tubemq.manager.controller.business;
 
 import java.util.List;
 import java.util.Optional;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.tubemq.manager.entry.BusinessEntry;
+import org.apache.tubemq.manager.exceptions.TubeMQManagerException;
 import org.apache.tubemq.manager.repository.BusinessRepository;
+import org.apache.tubemq.manager.service.AsyncService;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.http.ResponseEntity;
 import org.springframework.web.bind.annotation.GetMapping;
 import org.springframework.web.bind.annotation.PathVariable;
 import org.springframework.web.bind.annotation.PostMapping;
@@ -32,11 +34,15 @@ import org.springframework.web.bind.annotation.RestController;
 
 @RestController
 @RequestMapping(path = "/business")
+@Slf4j
 public class BusinessController {
 
     @Autowired
     private BusinessRepository businessRepository;
 
+    @Autowired
+    private AsyncService asyncService;
+
     /**
      * add new business.
      *
@@ -44,9 +50,9 @@ public class BusinessController {
      * @throws Exception - exception
      */
     @PostMapping("/add")
-    public ResponseEntity<?> addBusiness(@RequestBody BusinessEntry entry) throws Exception {
-        // businessRepository.saveAndFlush(entry);
-        return ResponseEntity.ok().build();
+    public BusinessResult addBusiness(@RequestBody BusinessEntry entry) {
+        businessRepository.saveAndFlush(entry);
+        return new BusinessResult();
     }
 
     /**
@@ -56,8 +62,8 @@ public class BusinessController {
      * @throws Exception
      */
     @PostMapping("/update")
-    public ResponseEntity<?> updateBusiness(@RequestBody BusinessEntry entry) throws Exception {
-        return ResponseEntity.ok().build();
+    public BusinessResult updateBusiness(@RequestBody BusinessEntry entry) {
+        return new BusinessResult();
     }
 
     /**
@@ -67,10 +73,10 @@ public class BusinessController {
      * @throws Exception
      */
     @GetMapping("/check")
-    public ResponseEntity<?> checkBusinessByName(
-            @RequestParam String businessName) throws Exception {
+    public BusinessResult checkBusinessByName(
+            @RequestParam String businessName) {
         List<BusinessEntry> result = businessRepository.findAllByBusinessName(businessName);
-        return ResponseEntity.ok().build();
+        return new BusinessResult();
     }
 
     /**
@@ -81,13 +87,20 @@ public class BusinessController {
      * @throws Exception
      */
     @GetMapping("/get/{id}")
-    public ResponseEntity<BusinessEntry> getBusinessByID(
-            @PathVariable Long id) throws Exception {
+    public BusinessResult getBusinessByID(
+            @PathVariable Long id) {
         Optional<BusinessEntry> businessEntry = businessRepository.findById(id);
-        if (businessEntry.isPresent()) {
-            return ResponseEntity.ok().build();
-        } else {
-            return ResponseEntity.notFound().build();
+        BusinessResult result = new BusinessResult();
+        if (!businessEntry.isPresent()) {
+            result.setCode(-1);
+            result.setMessage("business not found");
         }
+        return result;
+    }
+
+
+    @GetMapping("/throwException")
+    public BusinessResult throwException() {
+        throw new TubeMQManagerException("exception for test");
     }
 }
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/BusinessResult.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/business/BusinessResult.java
similarity index 89%
rename from tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/BusinessResult.java
rename to tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/business/BusinessResult.java
index 6e4a6f6..88c39ae 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/BusinessResult.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/business/BusinessResult.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.tubemq.manager.controller;
+package org.apache.tubemq.manager.controller.business;
 
 import lombok.Data;
 
@@ -23,6 +23,6 @@ import lombok.Data;
  */
 @Data
 public class BusinessResult {
-    private int state;
-    private String msg;
+    private String message;
+    private int code = 0;
 }
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/BusinessEntry.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/BusinessEntry.java
index d56b0f2..88e8e1e 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/BusinessEntry.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/BusinessEntry.java
@@ -120,9 +120,6 @@ public class BusinessEntry {
     @Size(max = 32)
     private String issueMethod;
 
-    private BusinessEntry() {
-
-    }
 
     public BusinessEntry(String businessName, String schemaName,
             String username, String passwd, String topic, String encodingType) {
@@ -133,4 +130,8 @@ public class BusinessEntry {
         this.topic = topic;
         this.encodingType = encodingType;
     }
+
+    public BusinessEntry() {
+
+    }
 }
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/backend/ThreadStartAndStop.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/exceptions/TubeMQManagerException.java
similarity index 67%
rename from tubemq-manager/src/main/java/org/apache/tubemq/manager/backend/ThreadStartAndStop.java
rename to tubemq-manager/src/main/java/org/apache/tubemq/manager/exceptions/TubeMQManagerException.java
index d3cacef..46c888c 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/backend/ThreadStartAndStop.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/exceptions/TubeMQManagerException.java
@@ -14,24 +14,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.tubemq.manager.backend;
+
+package org.apache.tubemq.manager.exceptions;
 
 /**
- * Interface for starting and stopping backend threads.
+ * TubeMQ runtime exception.
  */
-public interface ThreadStartAndStop {
-    /**
-     * start all threads.
-     */
-    void startThreads() throws Exception;
-
-    /**
-     * stop all threads.
-     */
-    void stopThreads() throws Exception;
+public class TubeMQManagerException extends RuntimeException {
 
-    /**
-     * wait for all thread finishing.
-     */
-    void join() throws Exception;
+    public TubeMQManagerException(final String message) {
+        super(message);
+    }
 }
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/backend/TubeMQManagerFactory.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/AsyncService.java
similarity index 50%
rename from tubemq-manager/src/main/java/org/apache/tubemq/manager/backend/TubeMQManagerFactory.java
rename to tubemq-manager/src/main/java/org/apache/tubemq/manager/service/AsyncService.java
index ca72901..335f1d0 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/backend/TubeMQManagerFactory.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/AsyncService.java
@@ -15,32 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.tubemq.manager.backend;
+package org.apache.tubemq.manager.service;
 
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
 
 /**
- * Thread factory for tubeMQ manager.
+ * Service for running async tasks.
+ * https://howtodoinjava.com/spring-boot2/rest/enableasync-async-controller/
  */
-public class TubeMQManagerFactory implements ThreadFactory {
+@Service
+@Slf4j
+public class AsyncService {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(TubeMQManagerFactory.class);
-
-    private final AtomicInteger mThreadNum = new AtomicInteger(1);
-
-    private final String threadType;
-
-    public TubeMQManagerFactory(String threadType) {
-        this.threadType = threadType;
-    }
-
-    @Override
-    public Thread newThread(Runnable r) {
-        Thread t = new Thread(r, threadType + "-running-thread-" + mThreadNum.getAndIncrement());
-        LOGGER.info("{} created", t.getName());
-        return t;
-    }
-}
\ No newline at end of file
+}
diff --git a/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestBusinessController.java b/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestBusinessController.java
index e934081..2ddfb67 100644
--- a/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestBusinessController.java
+++ b/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestBusinessController.java
@@ -17,7 +17,10 @@
 package org.apache.tubemq.manager.controller;
 
 import java.net.URI;
+import java.util.Objects;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.tubemq.manager.controller.business.BusinessController;
+import org.apache.tubemq.manager.controller.business.BusinessResult;
 import org.apache.tubemq.manager.entry.BusinessEntry;
 import org.junit.Before;
 import org.junit.Test;
@@ -79,8 +82,18 @@ public class TestBusinessController {
         HttpHeaders headers = new HttpHeaders();
         HttpEntity<BusinessEntry> request = new HttpEntity<>(entry, headers);
 
-        ResponseEntity<?> responseEntity =
-                client.postForEntity(uri, request, ResponseEntity.class);
+        ResponseEntity<BusinessResult> responseEntity =
+                client.postForEntity(uri, request, BusinessResult.class);
         assertThat(responseEntity.getStatusCode().is2xxSuccessful()).isEqualTo(true);
     }
+
+    @Test
+    public void testControllerException() throws Exception {
+        final String baseUrl = "http://localhost:" + randomServerPort + "/business/throwException";
+        URI uri = new URI(baseUrl);
+        ResponseEntity<BusinessResult> responseEntity =
+                client.getForEntity(uri, BusinessResult.class);
+        assertThat(Objects.requireNonNull(responseEntity.getBody()).getCode()).isEqualTo(-1);
+        assertThat(responseEntity.getBody().getMessage()).isEqualTo("exception for test");
+    }
 }