You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by te...@apache.org on 2020/08/31 07:22:44 UTC

[shardingsphere-elasticjob] branch master updated: Improve HTTP job executor. (#1432)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere-elasticjob.git


The following commit(s) were added to refs/heads/master by this push:
     new 2b920e1  Improve HTTP job executor. (#1432)
2b920e1 is described below

commit 2b920e1b8c6ee9601754ae383c755b183f0dbf7e
Author: 吴伟杰 <ro...@me.com>
AuthorDate: Mon Aug 31 15:22:36 2020 +0800

    Improve HTTP job executor. (#1432)
---
 .../elasticjob-http-executor/pom.xml               |  9 ++++
 .../elasticjob/http/executor/HttpJobExecutor.java  | 48 ++++++++++++----------
 .../http/executor/HttpJobExecutorTest.java         |  2 +-
 3 files changed, 37 insertions(+), 22 deletions(-)

diff --git a/elasticjob-executor/elasticjob-executor-type/elasticjob-http-executor/pom.xml b/elasticjob-executor/elasticjob-executor-type/elasticjob-http-executor/pom.xml
index 3077e60..8eb6ec6 100644
--- a/elasticjob-executor/elasticjob-executor-type/elasticjob-http-executor/pom.xml
+++ b/elasticjob-executor/elasticjob-executor-type/elasticjob-http-executor/pom.xml
@@ -45,11 +45,20 @@
         <dependency>
             <groupId>org.projectlombok</groupId>
             <artifactId>lombok</artifactId>
+            <scope>provided</scope>
         </dependency>
         
         <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-jdk14</artifactId>
+            <optional>true</optional>
+            <scope>test</scope>
         </dependency>
     </dependencies>
 </project>
diff --git a/elasticjob-executor/elasticjob-executor-type/elasticjob-http-executor/src/main/java/org/apache/shardingsphere/elasticjob/http/executor/HttpJobExecutor.java b/elasticjob-executor/elasticjob-executor-type/elasticjob-http-executor/src/main/java/org/apache/shardingsphere/elasticjob/http/executor/HttpJobExecutor.java
index bea3f8c..123cc08 100644
--- a/elasticjob-executor/elasticjob-executor-type/elasticjob-http-executor/src/main/java/org/apache/shardingsphere/elasticjob/http/executor/HttpJobExecutor.java
+++ b/elasticjob-executor/elasticjob-executor-type/elasticjob-http-executor/src/main/java/org/apache/shardingsphere/elasticjob/http/executor/HttpJobExecutor.java
@@ -31,9 +31,10 @@ import org.apache.shardingsphere.elasticjob.infra.exception.JobExecutionExceptio
 import org.apache.shardingsphere.elasticjob.infra.json.GsonFactory;
 
 import java.io.BufferedReader;
-import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.InputStreamReader;
+import java.io.OutputStream;
 import java.net.HttpURLConnection;
 import java.net.URL;
 import java.nio.charset.StandardCharsets;
@@ -50,7 +51,6 @@ public final class HttpJobExecutor implements TypedJobItemExecutor {
     public void process(final ElasticJob elasticJob, final JobConfiguration jobConfig, final JobFacade jobFacade, final ShardingContext shardingContext) {
         HttpParam httpParam = getHttpParam(jobConfig.getProps());
         HttpURLConnection connection = null;
-        BufferedReader bufferedReader = null;
         try {
             URL url = new URL(httpParam.getUrl());
             connection = (HttpURLConnection) url.openConnection();
@@ -65,33 +65,35 @@ public final class HttpJobExecutor implements TypedJobItemExecutor {
             connection.connect();
             String data = httpParam.getData();
             if (isWriteMethod(httpParam.getMethod()) && !Strings.isNullOrEmpty(data)) {
-                DataOutputStream dataOutputStream = new DataOutputStream(connection.getOutputStream());
-                dataOutputStream.write(data.getBytes(StandardCharsets.UTF_8));
-                dataOutputStream.flush();
-                dataOutputStream.close();
+                try (OutputStream outputStream = connection.getOutputStream()) {
+                    outputStream.write(data.getBytes(StandardCharsets.UTF_8));
+                }
             }
             int code = connection.getResponseCode();
-            if (code != 200) {
-                throw new JobExecutionException("Http job %s executed with response code %d", jobConfig.getJobName(), code);
+            InputStream resultInputStream;
+            if (isRequestSucceed(code)) {
+                resultInputStream = connection.getInputStream();
+            } else {
+                log.warn("Http job {} executed with response code {}", jobConfig.getJobName(), code);
+                resultInputStream = connection.getErrorStream();
             }
-            bufferedReader = new BufferedReader(new InputStreamReader(connection.getInputStream(), StandardCharsets.UTF_8));
             StringBuilder result = new StringBuilder();
-            String line;
-            while ((line = bufferedReader.readLine()) != null) {
-                result.append(line);
+            try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(resultInputStream, StandardCharsets.UTF_8))) {
+                String line;
+                while (null != (line = bufferedReader.readLine())) {
+                    result.append(line);
+                }
+            }
+            if (isRequestSucceed(code)) {
+                log.debug("http job execute result : {}", result.toString());
+            } else {
+                log.warn("Http job {} executed with response body {}", jobConfig.getJobName(), result.toString());
             }
-            log.debug("http job execute result : {}", result.toString());
         } catch (final IOException ex) {
             throw new JobExecutionException(ex);
         } finally {
-            try {
-                if (null != bufferedReader) {
-                    bufferedReader.close();
-                }
-                if (null != connection) {
-                    connection.disconnect();
-                }
-            } catch (final IOException ignore) {
+            if (null != connection) {
+                connection.disconnect();
             }
         }
     }
@@ -116,6 +118,10 @@ public final class HttpJobExecutor implements TypedJobItemExecutor {
         return Arrays.asList("POST", "PUT", "DELETE").contains(method.toUpperCase());
     }
     
+    private boolean isRequestSucceed(final int httpStatusCode) {
+        return HttpURLConnection.HTTP_BAD_REQUEST > httpStatusCode;
+    }
+    
     @Override
     public String getType() {
         return "HTTP";
diff --git a/elasticjob-executor/elasticjob-executor-type/elasticjob-http-executor/src/test/java/org/apache/shardingsphere/elasticjob/http/executor/HttpJobExecutorTest.java b/elasticjob-executor/elasticjob-executor-type/elasticjob-http-executor/src/test/java/org/apache/shardingsphere/elasticjob/http/executor/HttpJobExecutorTest.java
index e4c6243..b9cddde 100644
--- a/elasticjob-executor/elasticjob-executor-type/elasticjob-http-executor/src/test/java/org/apache/shardingsphere/elasticjob/http/executor/HttpJobExecutorTest.java
+++ b/elasticjob-executor/elasticjob-executor-type/elasticjob-http-executor/src/test/java/org/apache/shardingsphere/elasticjob/http/executor/HttpJobExecutorTest.java
@@ -103,7 +103,7 @@ public final class HttpJobExecutorTest {
         jobExecutor.process(elasticJob, jobConfig, jobFacade, shardingContext);
     }
     
-    @Test(expected = JobExecutionException.class)
+    @Test
     public void assertProcessWithoutSuccessCode() {
         when(jobConfig.getProps().getProperty(HttpJobProperties.URI_KEY)).thenReturn(getRequestUri("/unknownMethod"));
         when(jobConfig.getProps().getProperty(HttpJobProperties.METHOD_KEY)).thenReturn("GET");