You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by "TaoZex (via GitHub)" <gi...@apache.org> on 2023/05/16 14:12:01 UTC

[GitHub] [incubator-seatunnel] TaoZex commented on a diff in pull request #4752: [Improve][Connector-V2][StarRocks]StarRocks Sink connector support 2pc and eos

TaoZex commented on code in PR #4752:
URL: https://github.com/apache/incubator-seatunnel/pull/4752#discussion_r1195202448


##########
docs/en/connector-v2/sink/StarRocks.md:
##########
@@ -89,6 +99,14 @@ The amount of time to wait before attempting to retry a request to `StarRocks`
 
 Whether to enable upsert/delete, only supports PrimaryKey model.
 
+### enable-2pc [bool]
+
+Whether to enable two-phase commit (2pc), the default is true, to ensure Exactly-Once semantics. For two-phase commit, please refer to [here](https://docs.starrocks.io/zh-cn/latest/loading/Stream_Load_transaction_interface).
+
+### flush_frequency_ms [long]
+
+trigger flush frequency for batch writing ON EOS semantics

Review Comment:
   ```suggestion
   Trigger flush frequency for batch writing on Exactly-Once semantics.
   ```



##########
seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/HttpHelper.java:
##########
@@ -77,100 +77,58 @@ public String doHttpPost(String postUrl, Map<String, String> header, String post
         }
     }
 
-    public String doHttpGet(String getUrl) throws IOException {
-        log.info("Executing GET from {}.", getUrl);
-        try (CloseableHttpClient httpclient = buildHttpClient()) {
-            HttpGet httpGet = new HttpGet(getUrl);
-            try (CloseableHttpResponse resp = httpclient.execute(httpGet)) {
-                HttpEntity respEntity = resp.getEntity();
-                if (null == respEntity) {
-                    log.warn("Request failed with empty response.");
-                    return null;
-                }
-                return EntityUtils.toString(respEntity);
+    public String doHttpExecute(HttpClientBuilder clientBuilder, HttpRequestBase httpRequestBase)
+            throws IOException {
+        if (Objects.isNull(clientBuilder)) clientBuilder = getDefaultClientBuilder();
+        try (CloseableHttpClient client = clientBuilder.build()) {
+            try (CloseableHttpResponse response = client.execute(httpRequestBase)) {
+                return parseHttpResponse(response, httpRequestBase.getMethod());
             }
         }
     }
 
-    public Map<String, Object> doHttpGet(String getUrl, Map<String, String> header)
-            throws IOException {
-        log.info("Executing GET from {}.", getUrl);
-        try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
-            HttpGet httpGet = new HttpGet(getUrl);
-            if (null != header) {
-                for (Map.Entry<String, String> entry : header.entrySet()) {
-                    httpGet.setHeader(entry.getKey(), String.valueOf(entry.getValue()));
-                }
-            }
-            try (CloseableHttpResponse resp = httpclient.execute(httpGet)) {
-                HttpEntity respEntity = getHttpEntity(resp);
-                if (null == respEntity) {
-                    log.warn("Request failed with empty response.");
-                    return null;
-                }
-                return JsonUtils.parseObject(EntityUtils.toString(respEntity), Map.class);
-            }
+    public String parseHttpResponse(CloseableHttpResponse response, String requestType)
+            throws StarRocksConnectorException {
+        int code = response.getStatusLine().getStatusCode();
+        if (307 == code) {
+            String errorMsg =
+                    String.format(
+                            "Request %s failed because http response code is 307 which means 'Temporary Redirect'. "
+                                    + "This can happen when FE responds the request slowly , you should find the reason first. The reason may be "
+                                    + "StarRocks FE/ENGINE GC, network delay, or others. response status line: %s",
+                            requestType, response.getStatusLine());
+            log.error("{}", errorMsg);
+            throw new StarRocksConnectorException(FLUSH_DATA_FAILED, errorMsg);
+        } else if (200 != code) {
+            String errorMsg =
+                    String.format(
+                            "Request %s failed because http response code is not 200. response status line: %s",
+                            requestType, response.getStatusLine());
+            log.error("{}", errorMsg);
+            throw new StarRocksConnectorException(FLUSH_DATA_FAILED, errorMsg);

Review Comment:
   Advise not to print log and throw exceptions at the same time.



##########
seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/HttpHelper.java:
##########
@@ -77,100 +77,58 @@ public String doHttpPost(String postUrl, Map<String, String> header, String post
         }
     }
 
-    public String doHttpGet(String getUrl) throws IOException {
-        log.info("Executing GET from {}.", getUrl);
-        try (CloseableHttpClient httpclient = buildHttpClient()) {
-            HttpGet httpGet = new HttpGet(getUrl);
-            try (CloseableHttpResponse resp = httpclient.execute(httpGet)) {
-                HttpEntity respEntity = resp.getEntity();
-                if (null == respEntity) {
-                    log.warn("Request failed with empty response.");
-                    return null;
-                }
-                return EntityUtils.toString(respEntity);
+    public String doHttpExecute(HttpClientBuilder clientBuilder, HttpRequestBase httpRequestBase)
+            throws IOException {
+        if (Objects.isNull(clientBuilder)) clientBuilder = getDefaultClientBuilder();
+        try (CloseableHttpClient client = clientBuilder.build()) {
+            try (CloseableHttpResponse response = client.execute(httpRequestBase)) {
+                return parseHttpResponse(response, httpRequestBase.getMethod());
             }
         }
     }
 
-    public Map<String, Object> doHttpGet(String getUrl, Map<String, String> header)
-            throws IOException {
-        log.info("Executing GET from {}.", getUrl);
-        try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
-            HttpGet httpGet = new HttpGet(getUrl);
-            if (null != header) {
-                for (Map.Entry<String, String> entry : header.entrySet()) {
-                    httpGet.setHeader(entry.getKey(), String.valueOf(entry.getValue()));
-                }
-            }
-            try (CloseableHttpResponse resp = httpclient.execute(httpGet)) {
-                HttpEntity respEntity = getHttpEntity(resp);
-                if (null == respEntity) {
-                    log.warn("Request failed with empty response.");
-                    return null;
-                }
-                return JsonUtils.parseObject(EntityUtils.toString(respEntity), Map.class);
-            }
+    public String parseHttpResponse(CloseableHttpResponse response, String requestType)
+            throws StarRocksConnectorException {
+        int code = response.getStatusLine().getStatusCode();
+        if (307 == code) {
+            String errorMsg =
+                    String.format(
+                            "Request %s failed because http response code is 307 which means 'Temporary Redirect'. "
+                                    + "This can happen when FE responds the request slowly , you should find the reason first. The reason may be "
+                                    + "StarRocks FE/ENGINE GC, network delay, or others. response status line: %s",
+                            requestType, response.getStatusLine());
+            log.error("{}", errorMsg);
+            throw new StarRocksConnectorException(FLUSH_DATA_FAILED, errorMsg);
+        } else if (200 != code) {
+            String errorMsg =
+                    String.format(
+                            "Request %s failed because http response code is not 200. response status line: %s",
+                            requestType, response.getStatusLine());
+            log.error("{}", errorMsg);
+            throw new StarRocksConnectorException(FLUSH_DATA_FAILED, errorMsg);
         }
-    }
 
-    @SuppressWarnings("unchecked")
-    public Map<String, Object> doHttpPut(String url, byte[] data, Map<String, String> header)
-            throws IOException {
-        final HttpClientBuilder httpClientBuilder =
-                HttpClients.custom()
-                        .setRedirectStrategy(
-                                new DefaultRedirectStrategy() {
-                                    @Override
-                                    protected boolean isRedirectable(String method) {
-                                        return true;
-                                    }
-                                });
-        try (CloseableHttpClient httpclient = httpClientBuilder.build()) {
-            HttpPut httpPut = new HttpPut(url);
-            if (null != header) {
-                for (Map.Entry<String, String> entry : header.entrySet()) {
-                    httpPut.setHeader(entry.getKey(), String.valueOf(entry.getValue()));
-                }
-            }
-            httpPut.setEntity(new ByteArrayEntity(data));
-            httpPut.setConfig(RequestConfig.custom().setRedirectsEnabled(true).build());
-            try (CloseableHttpResponse resp = httpclient.execute(httpPut)) {
-                int code = resp.getStatusLine().getStatusCode();
-                if (HttpStatus.SC_OK != code) {
-                    String errorText;
-                    try {
-                        HttpEntity respEntity = resp.getEntity();
-                        errorText = EntityUtils.toString(respEntity);
-                    } catch (Exception err) {
-                        errorText = "find errorText failed: " + err.getMessage();
-                    }
-                    log.warn("Request failed with code:{}, err:{}", code, errorText);
-                    Map<String, Object> errorMap = new HashMap<>();
-                    errorMap.put("Status", "Fail");
-                    errorMap.put("Message", errorText);
-                    return errorMap;
-                }
-                HttpEntity respEntity = resp.getEntity();
-                if (null == respEntity) {
-                    log.warn("Request failed with empty response.");
-                    return null;
-                }
-                return JsonUtils.parseObject(EntityUtils.toString(respEntity), Map.class);
-            }
+        HttpEntity respEntity = response.getEntity();
+        if (respEntity == null) {
+            String errorMsg =
+                    String.format(
+                            "Request %s failed because response entity is null. response status line: %s",
+                            requestType, response.getStatusLine());
+            log.error("{}", errorMsg);
+            throw new StarRocksConnectorException(FLUSH_DATA_FAILED, errorMsg);

Review Comment:
   Advise not to print log and throw exceptions at the same time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org