You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/04/12 11:11:11 UTC

[incubator-seatunnel] branch dev updated: [WIP]code cleanup (#1684)

This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 8e90c5ed [WIP]code cleanup (#1684)
8e90c5ed is described below

commit 8e90c5ede9512faa0d7bec8026f39f2cf3b788c1
Author: Kirs <ki...@apache.org>
AuthorDate: Tue Apr 12 19:11:06 2022 +0800

    [WIP]code cleanup (#1684)
    
    * [WIP]code cleanup
    
    * improve resource close method and optimize some methods
    
    * smell
    
    * Bump jackson-databind from 2.11.0 to 2.12.6.1
---
 pom.xml                                            |  3 +-
 .../seatunnel/flink/jdbc/source/JdbcSource.java    |  8 +-
 .../spark/http/source/util/HttpClientUtils.java    | 98 ++++++++--------------
 .../command/flink/FlinkTaskExecuteCommand.java     | 10 +--
 .../command/spark/SparkTaskExecuteCommand.java     |  9 +-
 seatunnel-dist/release-docs/LICENSE                |  2 +-
 tools/dependencies/known-dependencies.txt          |  2 +-
 7 files changed, 52 insertions(+), 80 deletions(-)

diff --git a/pom.xml b/pom.xml
index 02da41ce..d91947be 100644
--- a/pom.xml
+++ b/pom.xml
@@ -101,6 +101,7 @@
         <hadoop.version>2.7.5</hadoop.version>
         <fastjson.version>1.2.60</fastjson.version>
         <jackson.version>2.11.0</jackson.version>
+        <jackson-databind.version>2.12.6.1 </jackson-databind.version>
         <lombok.version>1.18.0</lombok.version>
         <mysql.version>8.0.16</mysql.version>
         <postgresql.version>42.3.3</postgresql.version>
@@ -502,7 +503,7 @@
             <dependency>
                 <groupId>com.fasterxml.jackson.core</groupId>
                 <artifactId>jackson-databind</artifactId>
-                <version>${jackson.version}</version>
+                <version>${jackson-databind.version}</version>
             </dependency>
 
             <dependency>
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/source/JdbcSource.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/source/JdbcSource.java
index 96fdc211..cb603884 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/source/JdbcSource.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/source/JdbcSource.java
@@ -193,9 +193,10 @@ public class JdbcSource implements FlinkBatchSource {
         if (config.hasPath(PARTITION_UPPER_BOUND) && config.hasPath(PARTITION_LOWER_BOUND)) {
             max = config.getLong(PARTITION_UPPER_BOUND);
             min = config.getLong(PARTITION_LOWER_BOUND);
-        } else {
-            ResultSet rs = connection.createStatement().executeQuery(String.format("SELECT MAX(%s),MIN(%s) " +
-                    "FROM %s", columnName, columnName, tableName));
+            return new JdbcNumericBetweenParametersProvider(min, max).ofBatchNum(parallelism * 2);
+        }
+        try (ResultSet rs = connection.createStatement().executeQuery(String.format("SELECT MAX(%s),MIN(%s) " +
+                "FROM %s", columnName, columnName, tableName))) {
             if (rs.next()) {
                 max = config.hasPath(PARTITION_UPPER_BOUND) ? config.getLong(PARTITION_UPPER_BOUND) :
                         Long.parseLong(rs.getString(1));
@@ -203,7 +204,6 @@ public class JdbcSource implements FlinkBatchSource {
                         Long.parseLong(rs.getString(2));
             }
         }
-
         return new JdbcNumericBetweenParametersProvider(min, max).ofBatchNum(parallelism * 2);
     }
 
diff --git a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-http/src/main/java/org/apache/seatunnel/spark/http/source/util/HttpClientUtils.java b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-http/src/main/java/org/apache/seatunnel/spark/http/source/util/HttpClientUtils.java
index ffd1947b..8dd3c6bd 100644
--- a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-http/src/main/java/org/apache/seatunnel/spark/http/source/util/HttpClientUtils.java
+++ b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-http/src/main/java/org/apache/seatunnel/spark/http/source/util/HttpClientUtils.java
@@ -45,6 +45,10 @@ import java.util.Set;
 
 public class HttpClientUtils {
 
+    private HttpClientUtils() {
+        throw new IllegalStateException("Utility class");
+    }
+
     private static final String ENCODING = "UTF-8";
     private static final int CONNECT_TIMEOUT = 6000 * 2;
     private static final int SOCKET_TIMEOUT = 6000 * 10;
@@ -83,9 +87,6 @@ public class HttpClientUtils {
      * @throws Exception information
      */
     public static HttpClientResult doGet(String url, Map<String, String> headers, Map<String, String> params) throws Exception {
-        // Create httpClient object
-        CloseableHttpClient httpClient = HttpClients.createDefault();
-
         // Create access address
         URIBuilder uriBuilder = new URIBuilder(url);
         if (params != null) {
@@ -95,27 +96,19 @@ public class HttpClientUtils {
             }
         }
 
-        HttpGet httpGet = new HttpGet(uriBuilder.build());
         /**
          * setConnectTimeout:Set the connection timeout, in milliseconds.
          * setSocketTimeout:The timeout period (ie response time) for requesting data acquisition, in milliseconds.
          * If an interface is accessed, and the data cannot be returned within a certain amount of time, the call is simply abandoned.
          */
         RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(CONNECT_TIMEOUT).setSocketTimeout(SOCKET_TIMEOUT).build();
+        HttpGet httpGet = new HttpGet(uriBuilder.build());
         httpGet.setConfig(requestConfig);
-
-        // set request header
-        packageHeader(headers, httpGet);
-
-        // Create httpResponse object
-        CloseableHttpResponse httpResponse = null;
-
-        try {
+        try (CloseableHttpClient httpClient = HttpClients.createDefault()) {
+            // set request header
+            packageHeader(headers, httpGet);
             // Execute the request and get the response result
-            return getHttpClientResult(httpResponse, httpClient, httpGet);
-        } finally {
-            // release resources
-            release(httpResponse, httpClient);
+            return getHttpClientResult(httpClient, httpGet);
         }
     }
 
@@ -152,8 +145,6 @@ public class HttpClientUtils {
      * @throws Exception information
      */
     public static HttpClientResult doPost(String url, Map<String, String> headers, Map<String, String> params) throws Exception {
-        // Create httpClient object
-        CloseableHttpClient httpClient = HttpClients.createDefault();
         HttpPost httpPost = new HttpPost(url);
         /**
          * setConnectTimeout:Set the connection timeout, in milliseconds.
@@ -168,15 +159,9 @@ public class HttpClientUtils {
         // Encapsulate request parameters
         packageParam(params, httpPost);
 
-        // Create httpResponse object
-        CloseableHttpResponse httpResponse = null;
-
-        try {
+        try (CloseableHttpClient httpClient = HttpClients.createDefault();) {
             // Execute the request and get the response result
-            return getHttpClientResult(httpResponse, httpClient, httpPost);
-        } finally {
-            // release resources
-            release(httpResponse, httpClient);
+            return getHttpClientResult(httpClient, httpPost);
         }
     }
 
@@ -192,7 +177,7 @@ public class HttpClientUtils {
     }
 
     /**
-     *Send a put request with request parameters
+     * Send a put request with request parameters
      *
      * @param url    request address
      * @param params request parameter map
@@ -200,19 +185,15 @@ public class HttpClientUtils {
      * @throws Exception information
      */
     public static HttpClientResult doPut(String url, Map<String, String> params) throws Exception {
-        CloseableHttpClient httpClient = HttpClients.createDefault();
+
         HttpPut httpPut = new HttpPut(url);
         RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(CONNECT_TIMEOUT).setSocketTimeout(SOCKET_TIMEOUT).build();
         httpPut.setConfig(requestConfig);
 
         packageParam(params, httpPut);
 
-        CloseableHttpResponse httpResponse = null;
-
-        try {
-            return getHttpClientResult(httpResponse, httpClient, httpPut);
-        } finally {
-            release(httpResponse, httpClient);
+        try (CloseableHttpClient httpClient = HttpClients.createDefault();) {
+            return getHttpClientResult(httpClient, httpPut);
         }
     }
 
@@ -224,16 +205,12 @@ public class HttpClientUtils {
      * @throws Exception information
      */
     public static HttpClientResult doDelete(String url) throws Exception {
-        CloseableHttpClient httpClient = HttpClients.createDefault();
+
         HttpDelete httpDelete = new HttpDelete(url);
         RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(CONNECT_TIMEOUT).setSocketTimeout(SOCKET_TIMEOUT).build();
         httpDelete.setConfig(requestConfig);
-
-        CloseableHttpResponse httpResponse = null;
-        try {
-            return getHttpClientResult(httpResponse, httpClient, httpDelete);
-        } finally {
-            release(httpResponse, httpClient);
+        try (CloseableHttpClient httpClient = HttpClients.createDefault();) {
+            return getHttpClientResult(httpClient, httpDelete);
         }
     }
 
@@ -247,7 +224,7 @@ public class HttpClientUtils {
      */
     public static HttpClientResult doDelete(String url, Map<String, String> params) throws Exception {
         if (params == null) {
-            params = new HashMap<String, String>(INITIAL_CAPACITY);
+            params = new HashMap<>(INITIAL_CAPACITY);
         }
 
         params.put("_method", "delete");
@@ -255,9 +232,9 @@ public class HttpClientUtils {
     }
 
     /**
-     *encapsulate request header
+     * encapsulate request header
      *
-     * @param params request header map
+     * @param params     request header map
      * @param httpMethod http request method
      */
     public static void packageHeader(Map<String, String> params, HttpRequestBase httpMethod) {
@@ -274,15 +251,14 @@ public class HttpClientUtils {
     /**
      * Encapsulate request parameters
      *
-     * @param params  request parameter map
+     * @param params     request parameter map
      * @param httpMethod http request method
      * @throws UnsupportedEncodingException exception information
      */
-    public static void packageParam(Map<String, String> params, HttpEntityEnclosingRequestBase httpMethod)
-        throws UnsupportedEncodingException {
+    public static void packageParam(Map<String, String> params, HttpEntityEnclosingRequestBase httpMethod) throws UnsupportedEncodingException {
         // Encapsulate request parameters
         if (params != null) {
-            List<NameValuePair> nvps = new ArrayList<NameValuePair>();
+            List<NameValuePair> nvps = new ArrayList<>();
             Set<Entry<String, String>> entrySet = params.entrySet();
             for (Entry<String, String> entry : entrySet) {
                 nvps.add(new BasicNameValuePair(entry.getKey(), entry.getValue()));
@@ -296,24 +272,22 @@ public class HttpClientUtils {
     /**
      * get response result
      *
-     * @param httpResponse http response object
-     * @param httpClient   http client object
-     * @param httpMethod   http method onject
+     * @param httpClient http client object
+     * @param httpMethod http method onject
      * @return http response result
      * @throws Exception information
      */
-    public static HttpClientResult getHttpClientResult(CloseableHttpResponse httpResponse,
-        CloseableHttpClient httpClient, HttpRequestBase httpMethod) throws Exception {
+    public static HttpClientResult getHttpClientResult(CloseableHttpClient httpClient, HttpRequestBase httpMethod) throws Exception {
         // execute request
-        httpResponse = httpClient.execute(httpMethod);
-
-        // get return result
-        if (httpResponse != null && httpResponse.getStatusLine() != null) {
-            String content = "";
-            if (httpResponse.getEntity() != null) {
-                content = EntityUtils.toString(httpResponse.getEntity(), ENCODING);
+        try (CloseableHttpResponse httpResponse = httpClient.execute(httpMethod)) {
+            // get return result
+            if (httpResponse != null && httpResponse.getStatusLine() != null) {
+                String content = "";
+                if (httpResponse.getEntity() != null) {
+                    content = EntityUtils.toString(httpResponse.getEntity(), ENCODING);
+                }
+                return new HttpClientResult(httpResponse.getStatusLine().getStatusCode(), content);
             }
-            return new HttpClientResult(httpResponse.getStatusLine().getStatusCode(), content);
         }
         return new HttpClientResult(HttpStatus.SC_INTERNAL_SERVER_ERROR);
     }
@@ -322,7 +296,7 @@ public class HttpClientUtils {
      * release resources
      *
      * @param httpResponse http response object
-     * @param httpClient http client objet
+     * @param httpClient   http client objet
      * @throws IOException information
      */
     public static void release(CloseableHttpResponse httpResponse, CloseableHttpClient httpClient) throws IOException {
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/flink/FlinkTaskExecuteCommand.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/flink/FlinkTaskExecuteCommand.java
index a97d990f..97f0f882 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/flink/FlinkTaskExecuteCommand.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/flink/FlinkTaskExecuteCommand.java
@@ -45,11 +45,6 @@ public class FlinkTaskExecuteCommand extends BaseTaskExecuteCommand<FlinkCommand
 
         Config config = new ConfigBuilder<>(configFile, engine).getConfig();
         ExecutionContext<FlinkEnvironment> executionContext = new ExecutionContext<>(config, engine);
-        Execution<BaseSource<FlinkEnvironment>,
-            BaseTransform<FlinkEnvironment>,
-            BaseSink<FlinkEnvironment>,
-            FlinkEnvironment> execution = new ExecutionFactory<>(executionContext).createExecution();
-
         List<BaseSource<FlinkEnvironment>> sources = executionContext.getSources();
         List<BaseTransform<FlinkEnvironment>> transforms = executionContext.getTransforms();
         List<BaseSink<FlinkEnvironment>> sinks = executionContext.getSinks();
@@ -57,7 +52,10 @@ public class FlinkTaskExecuteCommand extends BaseTaskExecuteCommand<FlinkCommand
         baseCheckConfig(sinks, transforms, sinks);
         showAsciiLogo();
 
-        try {
+        try (Execution<BaseSource<FlinkEnvironment>,
+                BaseTransform<FlinkEnvironment>,
+                BaseSink<FlinkEnvironment>,
+                FlinkEnvironment> execution = new ExecutionFactory<>(executionContext).createExecution()) {
             prepare(executionContext.getEnvironment(), sources, transforms, sinks);
             execution.start(sources, transforms, sinks);
             close(sources, transforms, sinks);
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/spark/SparkTaskExecuteCommand.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/spark/SparkTaskExecuteCommand.java
index 854ec24d..f1cfc2b6 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/spark/SparkTaskExecuteCommand.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/spark/SparkTaskExecuteCommand.java
@@ -47,14 +47,13 @@ public class SparkTaskExecuteCommand extends BaseTaskExecuteCommand<SparkCommand
         List<BaseTransform<SparkEnvironment>> transforms = executionContext.getTransforms();
         List<BaseSink<SparkEnvironment>> sinks = executionContext.getSinks();
 
-        Execution<
-            BaseSource<SparkEnvironment>,
-            BaseTransform<SparkEnvironment>,
-            BaseSink<SparkEnvironment>, SparkEnvironment> execution = new ExecutionFactory<>(executionContext).createExecution();
         baseCheckConfig(sources, transforms, sinks);
         showAsciiLogo();
 
-        try {
+        try (Execution<
+                BaseSource<SparkEnvironment>,
+                BaseTransform<SparkEnvironment>,
+                BaseSink<SparkEnvironment>, SparkEnvironment> execution = new ExecutionFactory<>(executionContext).createExecution()) {
             prepare(executionContext.getEnvironment(), sources, transforms, sinks);
             execution.start(sources, transforms, sinks);
             close(sources, transforms, sinks);
diff --git a/seatunnel-dist/release-docs/LICENSE b/seatunnel-dist/release-docs/LICENSE
index a2e98777..7626369c 100644
--- a/seatunnel-dist/release-docs/LICENSE
+++ b/seatunnel-dist/release-docs/LICENSE
@@ -821,7 +821,7 @@ The text of each license is the standard Apache 2.0 license.
      (The Apache Software License, Version 2.0) htrace-core (org.apache.htrace:htrace-core:3.1.0-incubating - http://incubator.apache.org/projects/htrace.html)
      (The Apache Software License, Version 2.0) htrace-core (org.htrace:htrace-core:3.0.4 - https://github.com/cloudera/htrace)
      (The Apache Software License, Version 2.0) htrace-core4 (org.apache.htrace:htrace-core4:4.1.0-incubating - http://incubator.apache.org/projects/htrace.html)
-     (The Apache Software License, Version 2.0) jackson-databind (com.fasterxml.jackson.core:jackson-databind:2.11.0 - http://github.com/FasterXML/jackson)
+     (The Apache Software License, Version 2.0) jackson-databind (com.fasterxml.jackson.core:jackson-databind:2.12.6.1  - http://github.com/FasterXML/jackson)
      (The Apache Software License, Version 2.0) jackson-module-scala (com.fasterxml.jackson.module:jackson-module-scala_2.11:2.6.7.1 - http://wiki.fasterxml.com/JacksonModuleScala)
      (The Apache Software License, Version 2.0) javax.inject (javax.inject:javax.inject:1 - http://code.google.com/p/atinject/)
      (The Apache Software License, Version 2.0) lang-mustache (org.elasticsearch.plugin:lang-mustache-client:7.5.1 - https://github.com/elastic/elasticsearch)
diff --git a/tools/dependencies/known-dependencies.txt b/tools/dependencies/known-dependencies.txt
index 63239b49..039645a0 100755
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -295,7 +295,7 @@ jackson-annotations-2.11.0.jar
 jackson-core-2.11.0.jar
 jackson-core-asl-1.9.13.jar
 jackson-core-asl-1.9.2.jar
-jackson-databind-2.11.0.jar
+jackson-databind-2.12.6.1.jar
 jackson-dataformat-cbor-2.12.3.jar
 jackson-dataformat-cbor-2.8.11.jar
 jackson-dataformat-smile-2.10.5.jar