You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/04/12 11:11:11 UTC
[incubator-seatunnel] branch dev updated: [WIP]code cleanup (#1684)
This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 8e90c5ed [WIP]code cleanup (#1684)
8e90c5ed is described below
commit 8e90c5ede9512faa0d7bec8026f39f2cf3b788c1
Author: Kirs <ki...@apache.org>
AuthorDate: Tue Apr 12 19:11:06 2022 +0800
[WIP]code cleanup (#1684)
* [WIP]code cleanup
* improve resource close method and optimize some methods
* smell
* Bump jackson-databind from 2.11.0 to 2.12.6.1
---
pom.xml | 3 +-
.../seatunnel/flink/jdbc/source/JdbcSource.java | 8 +-
.../spark/http/source/util/HttpClientUtils.java | 98 ++++++++--------------
.../command/flink/FlinkTaskExecuteCommand.java | 10 +--
.../command/spark/SparkTaskExecuteCommand.java | 9 +-
seatunnel-dist/release-docs/LICENSE | 2 +-
tools/dependencies/known-dependencies.txt | 2 +-
7 files changed, 52 insertions(+), 80 deletions(-)
diff --git a/pom.xml b/pom.xml
index 02da41ce..d91947be 100644
--- a/pom.xml
+++ b/pom.xml
@@ -101,6 +101,7 @@
<hadoop.version>2.7.5</hadoop.version>
<fastjson.version>1.2.60</fastjson.version>
<jackson.version>2.11.0</jackson.version>
+ <jackson-databind.version>2.12.6.1 </jackson-databind.version>
<lombok.version>1.18.0</lombok.version>
<mysql.version>8.0.16</mysql.version>
<postgresql.version>42.3.3</postgresql.version>
@@ -502,7 +503,7 @@
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
- <version>${jackson.version}</version>
+ <version>${jackson-databind.version}</version>
</dependency>
<dependency>
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/source/JdbcSource.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/source/JdbcSource.java
index 96fdc211..cb603884 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/source/JdbcSource.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/source/JdbcSource.java
@@ -193,9 +193,10 @@ public class JdbcSource implements FlinkBatchSource {
if (config.hasPath(PARTITION_UPPER_BOUND) && config.hasPath(PARTITION_LOWER_BOUND)) {
max = config.getLong(PARTITION_UPPER_BOUND);
min = config.getLong(PARTITION_LOWER_BOUND);
- } else {
- ResultSet rs = connection.createStatement().executeQuery(String.format("SELECT MAX(%s),MIN(%s) " +
- "FROM %s", columnName, columnName, tableName));
+ return new JdbcNumericBetweenParametersProvider(min, max).ofBatchNum(parallelism * 2);
+ }
+ try (ResultSet rs = connection.createStatement().executeQuery(String.format("SELECT MAX(%s),MIN(%s) " +
+ "FROM %s", columnName, columnName, tableName))) {
if (rs.next()) {
max = config.hasPath(PARTITION_UPPER_BOUND) ? config.getLong(PARTITION_UPPER_BOUND) :
Long.parseLong(rs.getString(1));
@@ -203,7 +204,6 @@ public class JdbcSource implements FlinkBatchSource {
Long.parseLong(rs.getString(2));
}
}
-
return new JdbcNumericBetweenParametersProvider(min, max).ofBatchNum(parallelism * 2);
}
diff --git a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-http/src/main/java/org/apache/seatunnel/spark/http/source/util/HttpClientUtils.java b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-http/src/main/java/org/apache/seatunnel/spark/http/source/util/HttpClientUtils.java
index ffd1947b..8dd3c6bd 100644
--- a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-http/src/main/java/org/apache/seatunnel/spark/http/source/util/HttpClientUtils.java
+++ b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-http/src/main/java/org/apache/seatunnel/spark/http/source/util/HttpClientUtils.java
@@ -45,6 +45,10 @@ import java.util.Set;
public class HttpClientUtils {
+ private HttpClientUtils() {
+ throw new IllegalStateException("Utility class");
+ }
+
private static final String ENCODING = "UTF-8";
private static final int CONNECT_TIMEOUT = 6000 * 2;
private static final int SOCKET_TIMEOUT = 6000 * 10;
@@ -83,9 +87,6 @@ public class HttpClientUtils {
* @throws Exception information
*/
public static HttpClientResult doGet(String url, Map<String, String> headers, Map<String, String> params) throws Exception {
- // Create httpClient object
- CloseableHttpClient httpClient = HttpClients.createDefault();
-
// Create access address
URIBuilder uriBuilder = new URIBuilder(url);
if (params != null) {
@@ -95,27 +96,19 @@ public class HttpClientUtils {
}
}
- HttpGet httpGet = new HttpGet(uriBuilder.build());
/**
* setConnectTimeout:Set the connection timeout, in milliseconds.
* setSocketTimeout:The timeout period (ie response time) for requesting data acquisition, in milliseconds.
* If an interface is accessed, and the data cannot be returned within a certain amount of time, the call is simply abandoned.
*/
RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(CONNECT_TIMEOUT).setSocketTimeout(SOCKET_TIMEOUT).build();
+ HttpGet httpGet = new HttpGet(uriBuilder.build());
httpGet.setConfig(requestConfig);
-
- // set request header
- packageHeader(headers, httpGet);
-
- // Create httpResponse object
- CloseableHttpResponse httpResponse = null;
-
- try {
+ try (CloseableHttpClient httpClient = HttpClients.createDefault()) {
+ // set request header
+ packageHeader(headers, httpGet);
// Execute the request and get the response result
- return getHttpClientResult(httpResponse, httpClient, httpGet);
- } finally {
- // release resources
- release(httpResponse, httpClient);
+ return getHttpClientResult(httpClient, httpGet);
}
}
@@ -152,8 +145,6 @@ public class HttpClientUtils {
* @throws Exception information
*/
public static HttpClientResult doPost(String url, Map<String, String> headers, Map<String, String> params) throws Exception {
- // Create httpClient object
- CloseableHttpClient httpClient = HttpClients.createDefault();
HttpPost httpPost = new HttpPost(url);
/**
* setConnectTimeout:Set the connection timeout, in milliseconds.
@@ -168,15 +159,9 @@ public class HttpClientUtils {
// Encapsulate request parameters
packageParam(params, httpPost);
- // Create httpResponse object
- CloseableHttpResponse httpResponse = null;
-
- try {
+ try (CloseableHttpClient httpClient = HttpClients.createDefault();) {
// Execute the request and get the response result
- return getHttpClientResult(httpResponse, httpClient, httpPost);
- } finally {
- // release resources
- release(httpResponse, httpClient);
+ return getHttpClientResult(httpClient, httpPost);
}
}
@@ -192,7 +177,7 @@ public class HttpClientUtils {
}
/**
- *Send a put request with request parameters
+ * Send a put request with request parameters
*
* @param url request address
* @param params request parameter map
@@ -200,19 +185,15 @@ public class HttpClientUtils {
* @throws Exception information
*/
public static HttpClientResult doPut(String url, Map<String, String> params) throws Exception {
- CloseableHttpClient httpClient = HttpClients.createDefault();
+
HttpPut httpPut = new HttpPut(url);
RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(CONNECT_TIMEOUT).setSocketTimeout(SOCKET_TIMEOUT).build();
httpPut.setConfig(requestConfig);
packageParam(params, httpPut);
- CloseableHttpResponse httpResponse = null;
-
- try {
- return getHttpClientResult(httpResponse, httpClient, httpPut);
- } finally {
- release(httpResponse, httpClient);
+ try (CloseableHttpClient httpClient = HttpClients.createDefault();) {
+ return getHttpClientResult(httpClient, httpPut);
}
}
@@ -224,16 +205,12 @@ public class HttpClientUtils {
* @throws Exception information
*/
public static HttpClientResult doDelete(String url) throws Exception {
- CloseableHttpClient httpClient = HttpClients.createDefault();
+
HttpDelete httpDelete = new HttpDelete(url);
RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(CONNECT_TIMEOUT).setSocketTimeout(SOCKET_TIMEOUT).build();
httpDelete.setConfig(requestConfig);
-
- CloseableHttpResponse httpResponse = null;
- try {
- return getHttpClientResult(httpResponse, httpClient, httpDelete);
- } finally {
- release(httpResponse, httpClient);
+ try (CloseableHttpClient httpClient = HttpClients.createDefault();) {
+ return getHttpClientResult(httpClient, httpDelete);
}
}
@@ -247,7 +224,7 @@ public class HttpClientUtils {
*/
public static HttpClientResult doDelete(String url, Map<String, String> params) throws Exception {
if (params == null) {
- params = new HashMap<String, String>(INITIAL_CAPACITY);
+ params = new HashMap<>(INITIAL_CAPACITY);
}
params.put("_method", "delete");
@@ -255,9 +232,9 @@ public class HttpClientUtils {
}
/**
- *encapsulate request header
+ * encapsulate request header
*
- * @param params request header map
+ * @param params request header map
* @param httpMethod http request method
*/
public static void packageHeader(Map<String, String> params, HttpRequestBase httpMethod) {
@@ -274,15 +251,14 @@ public class HttpClientUtils {
/**
* Encapsulate request parameters
*
- * @param params request parameter map
+ * @param params request parameter map
* @param httpMethod http request method
* @throws UnsupportedEncodingException exception information
*/
- public static void packageParam(Map<String, String> params, HttpEntityEnclosingRequestBase httpMethod)
- throws UnsupportedEncodingException {
+ public static void packageParam(Map<String, String> params, HttpEntityEnclosingRequestBase httpMethod) throws UnsupportedEncodingException {
// Encapsulate request parameters
if (params != null) {
- List<NameValuePair> nvps = new ArrayList<NameValuePair>();
+ List<NameValuePair> nvps = new ArrayList<>();
Set<Entry<String, String>> entrySet = params.entrySet();
for (Entry<String, String> entry : entrySet) {
nvps.add(new BasicNameValuePair(entry.getKey(), entry.getValue()));
@@ -296,24 +272,22 @@ public class HttpClientUtils {
/**
* get response result
*
- * @param httpResponse http response object
- * @param httpClient http client object
- * @param httpMethod http method onject
+ * @param httpClient http client object
+ * @param httpMethod http method onject
* @return http response result
* @throws Exception information
*/
- public static HttpClientResult getHttpClientResult(CloseableHttpResponse httpResponse,
- CloseableHttpClient httpClient, HttpRequestBase httpMethod) throws Exception {
+ public static HttpClientResult getHttpClientResult(CloseableHttpClient httpClient, HttpRequestBase httpMethod) throws Exception {
// execute request
- httpResponse = httpClient.execute(httpMethod);
-
- // get return result
- if (httpResponse != null && httpResponse.getStatusLine() != null) {
- String content = "";
- if (httpResponse.getEntity() != null) {
- content = EntityUtils.toString(httpResponse.getEntity(), ENCODING);
+ try (CloseableHttpResponse httpResponse = httpClient.execute(httpMethod)) {
+ // get return result
+ if (httpResponse != null && httpResponse.getStatusLine() != null) {
+ String content = "";
+ if (httpResponse.getEntity() != null) {
+ content = EntityUtils.toString(httpResponse.getEntity(), ENCODING);
+ }
+ return new HttpClientResult(httpResponse.getStatusLine().getStatusCode(), content);
}
- return new HttpClientResult(httpResponse.getStatusLine().getStatusCode(), content);
}
return new HttpClientResult(HttpStatus.SC_INTERNAL_SERVER_ERROR);
}
@@ -322,7 +296,7 @@ public class HttpClientUtils {
* release resources
*
* @param httpResponse http response object
- * @param httpClient http client objet
+ * @param httpClient http client objet
* @throws IOException information
*/
public static void release(CloseableHttpResponse httpResponse, CloseableHttpClient httpClient) throws IOException {
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/flink/FlinkTaskExecuteCommand.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/flink/FlinkTaskExecuteCommand.java
index a97d990f..97f0f882 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/flink/FlinkTaskExecuteCommand.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/flink/FlinkTaskExecuteCommand.java
@@ -45,11 +45,6 @@ public class FlinkTaskExecuteCommand extends BaseTaskExecuteCommand<FlinkCommand
Config config = new ConfigBuilder<>(configFile, engine).getConfig();
ExecutionContext<FlinkEnvironment> executionContext = new ExecutionContext<>(config, engine);
- Execution<BaseSource<FlinkEnvironment>,
- BaseTransform<FlinkEnvironment>,
- BaseSink<FlinkEnvironment>,
- FlinkEnvironment> execution = new ExecutionFactory<>(executionContext).createExecution();
-
List<BaseSource<FlinkEnvironment>> sources = executionContext.getSources();
List<BaseTransform<FlinkEnvironment>> transforms = executionContext.getTransforms();
List<BaseSink<FlinkEnvironment>> sinks = executionContext.getSinks();
@@ -57,7 +52,10 @@ public class FlinkTaskExecuteCommand extends BaseTaskExecuteCommand<FlinkCommand
baseCheckConfig(sinks, transforms, sinks);
showAsciiLogo();
- try {
+ try (Execution<BaseSource<FlinkEnvironment>,
+ BaseTransform<FlinkEnvironment>,
+ BaseSink<FlinkEnvironment>,
+ FlinkEnvironment> execution = new ExecutionFactory<>(executionContext).createExecution()) {
prepare(executionContext.getEnvironment(), sources, transforms, sinks);
execution.start(sources, transforms, sinks);
close(sources, transforms, sinks);
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/spark/SparkTaskExecuteCommand.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/spark/SparkTaskExecuteCommand.java
index 854ec24d..f1cfc2b6 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/spark/SparkTaskExecuteCommand.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/spark/SparkTaskExecuteCommand.java
@@ -47,14 +47,13 @@ public class SparkTaskExecuteCommand extends BaseTaskExecuteCommand<SparkCommand
List<BaseTransform<SparkEnvironment>> transforms = executionContext.getTransforms();
List<BaseSink<SparkEnvironment>> sinks = executionContext.getSinks();
- Execution<
- BaseSource<SparkEnvironment>,
- BaseTransform<SparkEnvironment>,
- BaseSink<SparkEnvironment>, SparkEnvironment> execution = new ExecutionFactory<>(executionContext).createExecution();
baseCheckConfig(sources, transforms, sinks);
showAsciiLogo();
- try {
+ try (Execution<
+ BaseSource<SparkEnvironment>,
+ BaseTransform<SparkEnvironment>,
+ BaseSink<SparkEnvironment>, SparkEnvironment> execution = new ExecutionFactory<>(executionContext).createExecution()) {
prepare(executionContext.getEnvironment(), sources, transforms, sinks);
execution.start(sources, transforms, sinks);
close(sources, transforms, sinks);
diff --git a/seatunnel-dist/release-docs/LICENSE b/seatunnel-dist/release-docs/LICENSE
index a2e98777..7626369c 100644
--- a/seatunnel-dist/release-docs/LICENSE
+++ b/seatunnel-dist/release-docs/LICENSE
@@ -821,7 +821,7 @@ The text of each license is the standard Apache 2.0 license.
(The Apache Software License, Version 2.0) htrace-core (org.apache.htrace:htrace-core:3.1.0-incubating - http://incubator.apache.org/projects/htrace.html)
(The Apache Software License, Version 2.0) htrace-core (org.htrace:htrace-core:3.0.4 - https://github.com/cloudera/htrace)
(The Apache Software License, Version 2.0) htrace-core4 (org.apache.htrace:htrace-core4:4.1.0-incubating - http://incubator.apache.org/projects/htrace.html)
- (The Apache Software License, Version 2.0) jackson-databind (com.fasterxml.jackson.core:jackson-databind:2.11.0 - http://github.com/FasterXML/jackson)
+ (The Apache Software License, Version 2.0) jackson-databind (com.fasterxml.jackson.core:jackson-databind:2.12.6.1 - http://github.com/FasterXML/jackson)
(The Apache Software License, Version 2.0) jackson-module-scala (com.fasterxml.jackson.module:jackson-module-scala_2.11:2.6.7.1 - http://wiki.fasterxml.com/JacksonModuleScala)
(The Apache Software License, Version 2.0) javax.inject (javax.inject:javax.inject:1 - http://code.google.com/p/atinject/)
(The Apache Software License, Version 2.0) lang-mustache (org.elasticsearch.plugin:lang-mustache-client:7.5.1 - https://github.com/elastic/elasticsearch)
diff --git a/tools/dependencies/known-dependencies.txt b/tools/dependencies/known-dependencies.txt
index 63239b49..039645a0 100755
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -295,7 +295,7 @@ jackson-annotations-2.11.0.jar
jackson-core-2.11.0.jar
jackson-core-asl-1.9.13.jar
jackson-core-asl-1.9.2.jar
-jackson-databind-2.11.0.jar
+jackson-databind-2.12.6.1.jar
jackson-dataformat-cbor-2.12.3.jar
jackson-dataformat-cbor-2.8.11.jar
jackson-dataformat-smile-2.10.5.jar