You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2020/09/27 10:33:33 UTC
[incubator-tubemq] branch TUBEMQ-336 updated: [TUBEMQ-364] uniform
response format for exception state (#278)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch TUBEMQ-336
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
The following commit(s) were added to refs/heads/TUBEMQ-336 by this push:
new 1c177a7 [TUBEMQ-364] uniform response format for exception state (#278)
1c177a7 is described below
commit 1c177a7edee6d7506e961ab5b32064c595828c70
Author: Yuanbo Liu <yu...@apache.org>
AuthorDate: Sun Sep 27 18:33:26 2020 +0800
[TUBEMQ-364] uniform response format for exception state (#278)
---
.../org/apache/tubemq/manager/TubeMQManager.java | 44 ++++++----
.../tubemq/manager/backend/AbstractDaemon.java | 97 ----------------------
.../controller/ManagerControllerAdvice.java | 46 ++++++++++
.../{ => business}/BusinessController.java | 45 ++++++----
.../controller/{ => business}/BusinessResult.java | 6 +-
.../apache/tubemq/manager/entry/BusinessEntry.java | 7 +-
.../TubeMQManagerException.java} | 23 ++---
.../AsyncService.java} | 33 ++------
.../manager/controller/TestBusinessController.java | 17 +++-
9 files changed, 142 insertions(+), 176 deletions(-)
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/TubeMQManager.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/TubeMQManager.java
index 5df581d..a25897b 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/TubeMQManager.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/TubeMQManager.java
@@ -16,30 +16,44 @@
*/
package org.apache.tubemq.manager;
-import org.apache.tubemq.manager.backend.AbstractDaemon;
+import java.util.concurrent.Executor;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.Bean;
import org.springframework.data.jpa.repository.config.EnableJpaAuditing;
+import org.springframework.scheduling.annotation.EnableAsync;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@SpringBootApplication
@EnableJpaAuditing
-public class TubeMQManager extends AbstractDaemon {
- public static void main(String[] args) throws Exception {
- TubeMQManager manager = new TubeMQManager();
- manager.startThreads();
- SpringApplication.run(TubeMQManager.class);
- // web application stopped, then stop working threads.
- manager.stopThreads();
- manager.join();
- }
+@EnableAsync
+public class TubeMQManager {
- @Override
- public void startThreads() throws Exception {
+ @Value("${manager.async.core.pool.size:2}")
+ private int asyncCorePoolSize;
- }
+ @Value("${manager.async.max.pool.size:20}")
+ private int asyncMaxPoolSize;
+
+ @Value("${manager.async.queue.capacity:100}")
+ private int asyncQueueCapacity;
- @Override
- public void stopThreads() throws Exception {
+ @Value("${manager.async.thread.prefix:AsyncThread-}")
+ private String threadPrefix;
+
+ public static void main(String[] args) throws Exception {
+ SpringApplication.run(TubeMQManager.class);
+ }
+ @Bean(name = "asyncExecutor")
+ public Executor asyncExecutor() {
+ ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
+ executor.setCorePoolSize(asyncCorePoolSize);
+ executor.setMaxPoolSize(asyncMaxPoolSize);
+ executor.setQueueCapacity(asyncQueueCapacity);
+ executor.setThreadNamePrefix(threadPrefix);
+ executor.initialize();
+ return executor;
}
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/backend/AbstractDaemon.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/backend/AbstractDaemon.java
deleted file mode 100644
index 2db9318..0000000
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/backend/AbstractDaemon.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tubemq.manager.backend;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Abstract daemon with a batch of working thread.
- */
-public abstract class AbstractDaemon implements ThreadStartAndStop {
- private static final Logger LOGGER = LoggerFactory.getLogger(AbstractDaemon.class);
-
- // worker thread pool
- private final ExecutorService workerServices;
- private final List<CompletableFuture<?>> workerFutures;
- private boolean runnable = true;
-
- public AbstractDaemon() {
- this.workerServices = Executors
- .newCachedThreadPool(new TubeMQManagerFactory(this.getClass().getSimpleName()));
- this.workerFutures = new ArrayList<>();
- }
-
- /**
- * Whether threads can in running state with while loop.
- *
- * @return - true if threads can run
- */
- public boolean isRunnable() {
- return runnable;
- }
-
- /**
- * Stop running threads.
- */
- public void stopRunningThreads() {
- runnable = false;
- }
-
- /**
- * Submit work thread to thread pool.
- *
- * @param worker - work thread
- */
- public void submitWorker(Runnable worker) {
- CompletableFuture<?> future = CompletableFuture.runAsync(worker, this.workerServices);
- workerFutures.add(future);
- LOGGER.info("{} running worker number is {}", this.getClass().getName(),
- workerFutures.size());
- }
-
- /**
- * Wait for threads finish.
- */
- public void join() {
- for (CompletableFuture<?> future : workerFutures) {
- future.join();
- }
- }
-
- /**
- * Stop thread pool and running threads if they're in the running state.
- *
- * @param timeout - max wait time
- * @param timeUnit - time unit
- */
- public void waitForTerminate(long timeout, TimeUnit timeUnit) throws Exception {
- // stopping working threads.
- if (isRunnable()) {
- stopRunningThreads();
- workerServices.shutdown();
- workerServices.awaitTermination(timeout, timeUnit);
- }
- }
-}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/ManagerControllerAdvice.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/ManagerControllerAdvice.java
new file mode 100644
index 0000000..33369ca
--- /dev/null
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/ManagerControllerAdvice.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.manager.controller;
+import javax.servlet.http.HttpServletRequest;
+import org.apache.tubemq.manager.controller.business.BusinessResult;
+import org.apache.tubemq.manager.exceptions.TubeMQManagerException;
+import org.springframework.web.bind.annotation.ExceptionHandler;
+import org.springframework.web.bind.annotation.RestControllerAdvice;
+
+/**
+ * Controller advice for handling exceptions
+ */
+@RestControllerAdvice
+public class ManagerControllerAdvice {
+
+ /**
+ * handling business TubeMQManagerException, and return json format string.
+ *
+ * @param request - http request
+ * @param ex - exception
+ * @return entity
+ */
+ @ExceptionHandler(TubeMQManagerException.class)
+ public BusinessResult handlingBusinessException(HttpServletRequest request,
+ TubeMQManagerException ex) {
+ BusinessResult result = new BusinessResult();
+ result.setMessage(ex.getMessage());
+ result.setCode(-1);
+ return result;
+ }
+}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/BusinessController.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/business/BusinessController.java
similarity index 68%
rename from tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/BusinessController.java
rename to tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/business/BusinessController.java
index 934c215..c8190a8 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/BusinessController.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/business/BusinessController.java
@@ -14,14 +14,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.tubemq.manager.controller;
+package org.apache.tubemq.manager.controller.business;
import java.util.List;
import java.util.Optional;
+import lombok.extern.slf4j.Slf4j;
import org.apache.tubemq.manager.entry.BusinessEntry;
+import org.apache.tubemq.manager.exceptions.TubeMQManagerException;
import org.apache.tubemq.manager.repository.BusinessRepository;
+import org.apache.tubemq.manager.service.AsyncService;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
@@ -32,11 +34,15 @@ import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping(path = "/business")
+@Slf4j
public class BusinessController {
@Autowired
private BusinessRepository businessRepository;
+ @Autowired
+ private AsyncService asyncService;
+
/**
* add new business.
*
@@ -44,9 +50,9 @@ public class BusinessController {
* @throws Exception - exception
*/
@PostMapping("/add")
- public ResponseEntity<?> addBusiness(@RequestBody BusinessEntry entry) throws Exception {
- // businessRepository.saveAndFlush(entry);
- return ResponseEntity.ok().build();
+ public BusinessResult addBusiness(@RequestBody BusinessEntry entry) {
+ businessRepository.saveAndFlush(entry);
+ return new BusinessResult();
}
/**
@@ -56,8 +62,8 @@ public class BusinessController {
* @throws Exception
*/
@PostMapping("/update")
- public ResponseEntity<?> updateBusiness(@RequestBody BusinessEntry entry) throws Exception {
- return ResponseEntity.ok().build();
+ public BusinessResult updateBusiness(@RequestBody BusinessEntry entry) {
+ return new BusinessResult();
}
/**
@@ -67,10 +73,10 @@ public class BusinessController {
* @throws Exception
*/
@GetMapping("/check")
- public ResponseEntity<?> checkBusinessByName(
- @RequestParam String businessName) throws Exception {
+ public BusinessResult checkBusinessByName(
+ @RequestParam String businessName) {
List<BusinessEntry> result = businessRepository.findAllByBusinessName(businessName);
- return ResponseEntity.ok().build();
+ return new BusinessResult();
}
/**
@@ -81,13 +87,20 @@ public class BusinessController {
* @throws Exception
*/
@GetMapping("/get/{id}")
- public ResponseEntity<BusinessEntry> getBusinessByID(
- @PathVariable Long id) throws Exception {
+ public BusinessResult getBusinessByID(
+ @PathVariable Long id) {
Optional<BusinessEntry> businessEntry = businessRepository.findById(id);
- if (businessEntry.isPresent()) {
- return ResponseEntity.ok().build();
- } else {
- return ResponseEntity.notFound().build();
+ BusinessResult result = new BusinessResult();
+ if (!businessEntry.isPresent()) {
+ result.setCode(-1);
+ result.setMessage("business not found");
}
+ return result;
+ }
+
+
+ @GetMapping("/throwException")
+ public BusinessResult throwException() {
+ throw new TubeMQManagerException("exception for test");
}
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/BusinessResult.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/business/BusinessResult.java
similarity index 89%
rename from tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/BusinessResult.java
rename to tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/business/BusinessResult.java
index 6e4a6f6..88c39ae 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/BusinessResult.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/business/BusinessResult.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.tubemq.manager.controller;
+package org.apache.tubemq.manager.controller.business;
import lombok.Data;
@@ -23,6 +23,6 @@ import lombok.Data;
*/
@Data
public class BusinessResult {
- private int state;
- private String msg;
+ private String message;
+ private int code = 0;
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/BusinessEntry.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/BusinessEntry.java
index d56b0f2..88e8e1e 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/BusinessEntry.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/BusinessEntry.java
@@ -120,9 +120,6 @@ public class BusinessEntry {
@Size(max = 32)
private String issueMethod;
- private BusinessEntry() {
-
- }
public BusinessEntry(String businessName, String schemaName,
String username, String passwd, String topic, String encodingType) {
@@ -133,4 +130,8 @@ public class BusinessEntry {
this.topic = topic;
this.encodingType = encodingType;
}
+
+ public BusinessEntry() {
+
+ }
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/backend/ThreadStartAndStop.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/exceptions/TubeMQManagerException.java
similarity index 67%
rename from tubemq-manager/src/main/java/org/apache/tubemq/manager/backend/ThreadStartAndStop.java
rename to tubemq-manager/src/main/java/org/apache/tubemq/manager/exceptions/TubeMQManagerException.java
index d3cacef..46c888c 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/backend/ThreadStartAndStop.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/exceptions/TubeMQManagerException.java
@@ -14,24 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.tubemq.manager.backend;
+
+package org.apache.tubemq.manager.exceptions;
/**
- * Interface for starting and stopping backend threads.
+ * TubeMQ runtime exception.
*/
-public interface ThreadStartAndStop {
- /**
- * start all threads.
- */
- void startThreads() throws Exception;
-
- /**
- * stop all threads.
- */
- void stopThreads() throws Exception;
+public class TubeMQManagerException extends RuntimeException {
- /**
- * wait for all thread finishing.
- */
- void join() throws Exception;
+ public TubeMQManagerException(final String message) {
+ super(message);
+ }
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/backend/TubeMQManagerFactory.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/AsyncService.java
similarity index 50%
rename from tubemq-manager/src/main/java/org/apache/tubemq/manager/backend/TubeMQManagerFactory.java
rename to tubemq-manager/src/main/java/org/apache/tubemq/manager/service/AsyncService.java
index ca72901..335f1d0 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/backend/TubeMQManagerFactory.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/AsyncService.java
@@ -15,32 +15,17 @@
* limitations under the License.
*/
-package org.apache.tubemq.manager.backend;
+package org.apache.tubemq.manager.service;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
/**
- * Thread factory for tubeMQ manager.
+ * Service for running async tasks.
+ * https://howtodoinjava.com/spring-boot2/rest/enableasync-async-controller/
*/
-public class TubeMQManagerFactory implements ThreadFactory {
+@Service
+@Slf4j
+public class AsyncService {
- private static final Logger LOGGER = LoggerFactory.getLogger(TubeMQManagerFactory.class);
-
- private final AtomicInteger mThreadNum = new AtomicInteger(1);
-
- private final String threadType;
-
- public TubeMQManagerFactory(String threadType) {
- this.threadType = threadType;
- }
-
- @Override
- public Thread newThread(Runnable r) {
- Thread t = new Thread(r, threadType + "-running-thread-" + mThreadNum.getAndIncrement());
- LOGGER.info("{} created", t.getName());
- return t;
- }
-}
\ No newline at end of file
+}
diff --git a/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestBusinessController.java b/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestBusinessController.java
index e934081..2ddfb67 100644
--- a/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestBusinessController.java
+++ b/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestBusinessController.java
@@ -17,7 +17,10 @@
package org.apache.tubemq.manager.controller;
import java.net.URI;
+import java.util.Objects;
import lombok.extern.slf4j.Slf4j;
+import org.apache.tubemq.manager.controller.business.BusinessController;
+import org.apache.tubemq.manager.controller.business.BusinessResult;
import org.apache.tubemq.manager.entry.BusinessEntry;
import org.junit.Before;
import org.junit.Test;
@@ -79,8 +82,18 @@ public class TestBusinessController {
HttpHeaders headers = new HttpHeaders();
HttpEntity<BusinessEntry> request = new HttpEntity<>(entry, headers);
- ResponseEntity<?> responseEntity =
- client.postForEntity(uri, request, ResponseEntity.class);
+ ResponseEntity<BusinessResult> responseEntity =
+ client.postForEntity(uri, request, BusinessResult.class);
assertThat(responseEntity.getStatusCode().is2xxSuccessful()).isEqualTo(true);
}
+
+ @Test
+ public void testControllerException() throws Exception {
+ final String baseUrl = "http://localhost:" + randomServerPort + "/business/throwException";
+ URI uri = new URI(baseUrl);
+ ResponseEntity<BusinessResult> responseEntity =
+ client.getForEntity(uri, BusinessResult.class);
+ assertThat(Objects.requireNonNull(responseEntity.getBody()).getCode()).isEqualTo(-1);
+ assertThat(responseEntity.getBody().getMessage()).isEqualTo("exception for test");
+ }
}