You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/08/03 02:29:22 UTC

[incubator-seatunnel] branch dev updated: [Improve][Http Connector-V2-Source] Refactor the code and make code more clearly (#2322)

This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new a9a797ad8 [Improve][Http Connector-V2-Source] Refactor the code and make code more clearly (#2322)
a9a797ad8 is described below

commit a9a797ad857f7180f219030448dd95479bf31f96
Author: TyrantLucifer <Ty...@gmail.com>
AuthorDate: Wed Aug 3 10:29:17 2022 +0800

    [Improve][Http Connector-V2-Source] Refactor the code and make code more clearly (#2322)
    
    1. Unified http parameter for http sink and source
    2. Change http plugin
    config option constant class name from 'Config' to 'HttpConfig' to avoid
    conflict with typesafe config
---
 .../http/config/{Config.java => HttpConfig.java}   |  5 +-
 .../seatunnel/http/config/HttpParameter.java       | 57 ++++++++++++++++++
 .../seatunnel/http/source/HttpSource.java          | 38 ++----------
 .../seatunnel/http/source/HttpSourceParameter.java | 70 ----------------------
 .../seatunnel/http/source/HttpSourceReader.java    |  9 +--
 5 files changed, 69 insertions(+), 110 deletions(-)

diff --git a/seatunnel-connectors-v2/connector-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/Config.java b/seatunnel-connectors-v2/connector-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpConfig.java
similarity index 97%
rename from seatunnel-connectors-v2/connector-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/Config.java
rename to seatunnel-connectors-v2/connector-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpConfig.java
index f62b81abc..29d622619 100644
--- a/seatunnel-connectors-v2/connector-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/Config.java
+++ b/seatunnel-connectors-v2/connector-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpConfig.java
@@ -17,14 +17,11 @@
 
 package org.apache.seatunnel.connectors.seatunnel.http.config;
 
-public class Config {
+public class HttpConfig {
     public static final String URL = "url";
-
     public static final String METHOD = "method";
     public static final String METHOD_DEFAULT_VALUE = "GET";
-
     public static final String HEADERS = "headers";
     public static final String PARAMS = "params";
     public static final String BODY = "body";
-
 }
diff --git a/seatunnel-connectors-v2/connector-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpParameter.java b/seatunnel-connectors-v2/connector-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpParameter.java
new file mode 100644
index 000000000..53f1161fd
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpParameter.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.http.config;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+@Data
+public class HttpParameter implements Serializable {
+    private String url;
+    private String method;
+    private Map<String, String> headers;
+    private Map<String, String> params;
+    private String body;
+    public void buildWithConfig(Config pluginConfig) {
+        // set url
+        this.setUrl(pluginConfig.getString(HttpConfig.URL));
+        // set method
+        if (pluginConfig.hasPath(HttpConfig.METHOD)) {
+            this.setMethod(pluginConfig.getString(HttpConfig.METHOD));
+        } else {
+            this.setMethod(HttpConfig.METHOD_DEFAULT_VALUE);
+        }
+        // set headers
+        if (pluginConfig.hasPath(HttpConfig.HEADERS)) {
+            this.setHeaders(pluginConfig.getConfig(HttpConfig.HEADERS).entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> String.valueOf(entry.getValue().unwrapped()), (v1, v2) -> v2)));
+        }
+        // set params
+        if (pluginConfig.hasPath(HttpConfig.PARAMS)) {
+            this.setHeaders(pluginConfig.getConfig(HttpConfig.PARAMS).entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> String.valueOf(entry.getValue().unwrapped()), (v1, v2) -> v2)));
+        }
+        // set body
+        if (pluginConfig.hasPath(HttpConfig.BODY)) {
+            this.setBody(pluginConfig.getString(HttpConfig.BODY));
+        }
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java b/seatunnel-connectors-v2/connector-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
index 0b906144b..cf09193ae 100644
--- a/seatunnel-connectors-v2/connector-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
+++ b/seatunnel-connectors-v2/connector-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
@@ -17,13 +17,6 @@
 
 package org.apache.seatunnel.connectors.seatunnel.http.source;
 
-import static org.apache.seatunnel.connectors.seatunnel.http.config.Config.BODY;
-import static org.apache.seatunnel.connectors.seatunnel.http.config.Config.HEADERS;
-import static org.apache.seatunnel.connectors.seatunnel.http.config.Config.METHOD;
-import static org.apache.seatunnel.connectors.seatunnel.http.config.Config.METHOD_DEFAULT_VALUE;
-import static org.apache.seatunnel.connectors.seatunnel.http.config.Config.PARAMS;
-import static org.apache.seatunnel.connectors.seatunnel.http.config.Config.URL;
-
 import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.common.SeaTunnelContext;
 import org.apache.seatunnel.api.source.Boundedness;
@@ -39,17 +32,16 @@ import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
 import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource;
 import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
+import org.apache.seatunnel.connectors.seatunnel.http.config.HttpConfig;
+import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import com.google.auto.service.AutoService;
 
-import java.util.Map;
-import java.util.stream.Collectors;
-
 @AutoService(SeaTunnelSource.class)
 public class HttpSource extends AbstractSingleSplitSource<SeaTunnelRow> {
-    private final HttpSourceParameter parameter = new HttpSourceParameter();
+    private final HttpParameter httpParameter = new HttpParameter();
     private SeaTunnelRowType rowType;
     private SeaTunnelContext seaTunnelContext;
 
@@ -65,29 +57,11 @@ public class HttpSource extends AbstractSingleSplitSource<SeaTunnelRow> {
 
     @Override
     public void prepare(Config pluginConfig) throws PrepareFailException {
-        CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, URL);
+        CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, HttpConfig.URL);
         if (!result.isSuccess()) {
             throw new PrepareFailException(getPluginName(), PluginType.SOURCE, result.getMsg());
         }
-        this.parameter.setUrl(pluginConfig.getString(URL));
-
-        if (pluginConfig.hasPath(METHOD)) {
-            this.parameter.setMethod(pluginConfig.getString(METHOD));
-        } else {
-            this.parameter.setMethod(METHOD_DEFAULT_VALUE);
-        }
-
-        if (pluginConfig.hasPath(HEADERS)) {
-            this.parameter.setHeaders(pluginConfig.getConfig(HEADERS).entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> String.valueOf(entry.getValue().unwrapped()), (v1, v2) -> v2)));
-        }
-
-        if (pluginConfig.hasPath(PARAMS)) {
-            this.parameter.setHeaders(pluginConfig.getConfig(PARAMS).entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> String.valueOf(entry.getValue().unwrapped()), (v1, v2) -> v2)));
-        }
-
-        if (pluginConfig.hasPath(BODY)) {
-            this.parameter.setBody(pluginConfig.getString(BODY));
-        }
+        this.httpParameter.buildWithConfig(pluginConfig);
         // TODO support user custom row type
         this.rowType = new SeaTunnelRowType(new String[]{"content"}, new SeaTunnelDataType<?>[]{BasicType.STRING_TYPE});
     }
@@ -104,6 +78,6 @@ public class HttpSource extends AbstractSingleSplitSource<SeaTunnelRow> {
 
     @Override
     public AbstractSingleSplitReader<SeaTunnelRow> createReader(SingleSplitReaderContext readerContext) throws Exception {
-        return new HttpSourceReader(this.parameter, readerContext);
+        return new HttpSourceReader(this.httpParameter, readerContext);
     }
 }
diff --git a/seatunnel-connectors-v2/connector-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceParameter.java b/seatunnel-connectors-v2/connector-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceParameter.java
deleted file mode 100644
index 5ec14f872..000000000
--- a/seatunnel-connectors-v2/connector-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceParameter.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.http.source;
-
-import java.io.Serializable;
-import java.util.Map;
-
-public class HttpSourceParameter implements Serializable {
-
-    private String url;
-    private String method;
-    private Map<String, String> headers;
-    private Map<String, String> params;
-    private String body;
-
-    public String getUrl() {
-        return url;
-    }
-
-    public void setUrl(String url) {
-        this.url = url;
-    }
-
-    public String getMethod() {
-        return method;
-    }
-
-    public void setMethod(String method) {
-        this.method = method;
-    }
-
-    public Map<String, String> getHeaders() {
-        return headers;
-    }
-
-    public void setHeaders(Map<String, String> headers) {
-        this.headers = headers;
-    }
-
-    public Map<String, String> getParams() {
-        return params;
-    }
-
-    public void setParams(Map<String, String> params) {
-        this.params = params;
-    }
-
-    public String getBody() {
-        return body;
-    }
-
-    public void setBody(String body) {
-        this.body = body;
-    }
-}
diff --git a/seatunnel-connectors-v2/connector-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java b/seatunnel-connectors-v2/connector-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
index c26bb055e..2c11ee6d2 100644
--- a/seatunnel-connectors-v2/connector-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
+++ b/seatunnel-connectors-v2/connector-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
@@ -26,6 +26,7 @@ import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSpl
 import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
 import org.apache.seatunnel.connectors.seatunnel.http.client.HttpClientProvider;
 import org.apache.seatunnel.connectors.seatunnel.http.client.HttpResponse;
+import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,12 +38,12 @@ public class HttpSourceReader extends AbstractSingleSplitReader<SeaTunnelRow> {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(HttpSourceReader.class);
     private final SingleSplitReaderContext context;
-    private final HttpSourceParameter parameter;
+    private final HttpParameter httpParameter;
     private HttpClientProvider httpClient;
 
-    public HttpSourceReader(HttpSourceParameter parameter, SingleSplitReaderContext context) {
+    public HttpSourceReader(HttpParameter httpParameter, SingleSplitReaderContext context) {
         this.context = context;
-        this.parameter = parameter;
+        this.httpParameter = httpParameter;
     }
 
     @Override
@@ -60,7 +61,7 @@ public class HttpSourceReader extends AbstractSingleSplitReader<SeaTunnelRow> {
     @Override
     public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
         try {
-            HttpResponse response = httpClient.execute(this.parameter.getUrl(), this.parameter.getMethod(), this.parameter.getHeaders(), this.parameter.getParams());
+            HttpResponse response = httpClient.execute(this.httpParameter.getUrl(), this.httpParameter.getMethod(), this.httpParameter.getHeaders(), this.httpParameter.getParams());
             if (STATUS_OK == response.getCode()) {
                 output.collect(new SeaTunnelRow(new Object[] {response.getContent()}));
                 return;