You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2022/05/22 05:16:08 UTC

[GitHub] [incubator-inlong] Greedyu opened a new pull request, #4291: [INLONG-4232][Manager] Support collect data from a specified position for MySQL binlog

Greedyu opened a new pull request, #4291:
URL: https://github.com/apache/incubator-inlong/pull/4291

   Fixes #4232
   1. Mangger support collect data from a specified position for MySQL binlog.
   2. Manager-client-tool supports creating groups.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] Greedyu commented on a diff in pull request #4291: [INLONG-4232][Manager] Support collect data from a specified position for MySQL binlog

Posted by GitBox <gi...@apache.org>.
Greedyu commented on code in PR #4291:
URL: https://github.com/apache/inlong/pull/4291#discussion_r908069935


##########
inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java:
##########
@@ -204,24 +204,26 @@ private void initOrUpdateSource() {
         final String streamId = streamInfo.getInlongStreamId();
         List<SourceListResponse> sourceListResponses = managerClient.listSources(groupId, streamId);
         List<String> updateSourceNames = Lists.newArrayList();
-        for (SourceListResponse sourceListResponse : sourceListResponses) {
-            final String sourceName = sourceListResponse.getSourceName();
-            final int id = sourceListResponse.getId();
-            if (sourceRequests.get(sourceName) == null) {
-                boolean isDelete = managerClient.deleteSource(id);
-                if (!isDelete) {
-                    throw new RuntimeException(String.format("Delete source=%s failed", sourceListResponse));
-                }
-            } else {
-                SourceRequest sourceRequest = sourceRequests.get(sourceName);
-                sourceRequest.setId(id);
-                Pair<Boolean, String> updateState = managerClient.updateSource(sourceRequest);
-                if (!updateState.getKey()) {
-                    throw new RuntimeException(String.format("Update source=%s failed with err=%s", sourceRequest,
-                            updateState.getValue()));
+        if (sourceListResponses != null && sourceListResponses.size() > 0) {
+            for (SourceListResponse sourceListResponse : sourceListResponses) {
+                final String sourceName = sourceListResponse.getSourceName();
+                final int id = sourceListResponse.getId();
+                if (sourceRequests.get(sourceName) == null) {
+                    boolean isDelete = managerClient.deleteSource(id);
+                    if (!isDelete) {
+                        throw new RuntimeException(String.format("Delete source=%s failed", sourceListResponse));

Review Comment:
   resolve



##########
inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sort/BaseSortConf.java:
##########
@@ -25,14 +27,21 @@
  */
 @Data
 @ApiModel("Sort configuration for inlong group")
+@JsonTypeInfo(use = Id.NAME, visible = true, property = "sortTypeString")
 public abstract class BaseSortConf {
 
+    public String sortTypeString;

Review Comment:
   resolve



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] Greedyu commented on a diff in pull request #4291: [INLONG-4232][Manager] Support collect data from a specified position for MySQL binlog

Posted by GitBox <gi...@apache.org>.
Greedyu commented on code in PR #4291:
URL: https://github.com/apache/inlong/pull/4291#discussion_r908069442


##########
inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java:
##########
@@ -204,24 +204,26 @@ private void initOrUpdateSource() {
         final String streamId = streamInfo.getInlongStreamId();
         List<SourceListResponse> sourceListResponses = managerClient.listSources(groupId, streamId);
         List<String> updateSourceNames = Lists.newArrayList();
-        for (SourceListResponse sourceListResponse : sourceListResponses) {
-            final String sourceName = sourceListResponse.getSourceName();
-            final int id = sourceListResponse.getId();
-            if (sourceRequests.get(sourceName) == null) {
-                boolean isDelete = managerClient.deleteSource(id);
-                if (!isDelete) {
-                    throw new RuntimeException(String.format("Delete source=%s failed", sourceListResponse));
-                }
-            } else {
-                SourceRequest sourceRequest = sourceRequests.get(sourceName);
-                sourceRequest.setId(id);
-                Pair<Boolean, String> updateState = managerClient.updateSource(sourceRequest);
-                if (!updateState.getKey()) {
-                    throw new RuntimeException(String.format("Update source=%s failed with err=%s", sourceRequest,
-                            updateState.getValue()));
+        if (sourceListResponses != null && sourceListResponses.size() > 0) {

Review Comment:
   resolve



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-inlong] EMsnap commented on a diff in pull request #4291: [INLONG-4232][Manager] Support collect data from a specified position for MySQL binlog

Posted by GitBox <gi...@apache.org>.
EMsnap commented on code in PR #4291:
URL: https://github.com/apache/incubator-inlong/pull/4291#discussion_r890709767


##########
inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CommandCreate.java:
##########
@@ -71,7 +71,7 @@ void run() {
                 }
             } catch (Exception e) {
                 System.out.println("Create group failed!");
-                System.out.println(e.getMessage());
+                e.printStackTrace();

Review Comment:
   printing stack trace is not proper 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] healchow commented on a diff in pull request #4291: [INLONG-4232][Manager] Support collect data from a specified position for MySQL binlog

Posted by GitBox <gi...@apache.org>.
healchow commented on code in PR #4291:
URL: https://github.com/apache/inlong/pull/4291#discussion_r907987980


##########
inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java:
##########
@@ -204,24 +204,26 @@ private void initOrUpdateSource() {
         final String streamId = streamInfo.getInlongStreamId();
         List<SourceListResponse> sourceListResponses = managerClient.listSources(groupId, streamId);
         List<String> updateSourceNames = Lists.newArrayList();
-        for (SourceListResponse sourceListResponse : sourceListResponses) {
-            final String sourceName = sourceListResponse.getSourceName();
-            final int id = sourceListResponse.getId();
-            if (sourceRequests.get(sourceName) == null) {
-                boolean isDelete = managerClient.deleteSource(id);
-                if (!isDelete) {
-                    throw new RuntimeException(String.format("Delete source=%s failed", sourceListResponse));
-                }
-            } else {
-                SourceRequest sourceRequest = sourceRequests.get(sourceName);
-                sourceRequest.setId(id);
-                Pair<Boolean, String> updateState = managerClient.updateSource(sourceRequest);
-                if (!updateState.getKey()) {
-                    throw new RuntimeException(String.format("Update source=%s failed with err=%s", sourceRequest,
-                            updateState.getValue()));
+        if (sourceListResponses != null && sourceListResponses.size() > 0) {
+            for (SourceListResponse sourceListResponse : sourceListResponses) {
+                final String sourceName = sourceListResponse.getSourceName();
+                final int id = sourceListResponse.getId();
+                if (sourceRequests.get(sourceName) == null) {
+                    boolean isDelete = managerClient.deleteSource(id);
+                    if (!isDelete) {
+                        throw new RuntimeException(String.format("Delete source=%s failed", sourceListResponse));

Review Comment:
   Suggested change the exception message to `throw new RuntimeException(String.format("Delete source failed by id=%s", id));`, thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-inlong] healchow commented on a diff in pull request #4291: [INLONG-4232][Manager] Support collect data from a specified position for MySQL binlog

Posted by GitBox <gi...@apache.org>.
healchow commented on code in PR #4291:
URL: https://github.com/apache/incubator-inlong/pull/4291#discussion_r879964449


##########
inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLBinlogSource.java:
##########
@@ -100,4 +100,10 @@ public class MySQLBinlogSource extends StreamSource {
 
     @ApiModelProperty(value = "Primary key must be shared by all tables", required = false)
     private String primaryKey;
+
+    @ApiModelProperty("Directly read binlog from the specified offset filename")
+    public String specificOffsetFile;

Review Comment:
   Maybe change to the `offsetFile` and `offsetPosition` are more readable.



##########
inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CommandCreate.java:
##########
@@ -71,7 +71,7 @@ void run() {
                 }
             } catch (Exception e) {
                 System.out.println("Create group failed!");
-                System.out.println(e.getMessage());

Review Comment:
   Just printing the message of the exception is enough, thanks.



##########
inlong-manager/manager-client-tools/src/test/resources/create_group.json:
##########
@@ -0,0 +1,98 @@
+{
+  "groupInfo": {
+    "groupName": "test_group14",
+    "inlongGroupId": "b_test_group45",
+    "description": "",
+    "inCharges": "admin",
+    "proxyClusterId": "2",
+    "mqType": "PULSAR",
+    "mqBaseConf": {

Review Comment:
   That MQ config was changed, please change it with the latest branch, thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] healchow commented on a diff in pull request #4291: [INLONG-4232][Manager] Support collect data from a specified position for MySQL binlog

Posted by GitBox <gi...@apache.org>.
healchow commented on code in PR #4291:
URL: https://github.com/apache/inlong/pull/4291#discussion_r907989725


##########
inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sort/BaseSortConf.java:
##########
@@ -25,14 +27,21 @@
  */
 @Data
 @ApiModel("Sort configuration for inlong group")
+@JsonTypeInfo(use = Id.NAME, visible = true, property = "sortTypeString")
 public abstract class BaseSortConf {
 
+    public String sortTypeString;

Review Comment:
   Suggested change to `sortType`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] healchow commented on a diff in pull request #4291: [INLONG-4232][Manager] Support collect data from a specified position for MySQL binlog

Posted by GitBox <gi...@apache.org>.
healchow commented on code in PR #4291:
URL: https://github.com/apache/inlong/pull/4291#discussion_r907986195


##########
inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java:
##########
@@ -204,24 +204,26 @@ private void initOrUpdateSource() {
         final String streamId = streamInfo.getInlongStreamId();
         List<SourceListResponse> sourceListResponses = managerClient.listSources(groupId, streamId);
         List<String> updateSourceNames = Lists.newArrayList();
-        for (SourceListResponse sourceListResponse : sourceListResponses) {
-            final String sourceName = sourceListResponse.getSourceName();
-            final int id = sourceListResponse.getId();
-            if (sourceRequests.get(sourceName) == null) {
-                boolean isDelete = managerClient.deleteSource(id);
-                if (!isDelete) {
-                    throw new RuntimeException(String.format("Delete source=%s failed", sourceListResponse));
-                }
-            } else {
-                SourceRequest sourceRequest = sourceRequests.get(sourceName);
-                sourceRequest.setId(id);
-                Pair<Boolean, String> updateState = managerClient.updateSource(sourceRequest);
-                if (!updateState.getKey()) {
-                    throw new RuntimeException(String.format("Update source=%s failed with err=%s", sourceRequest,
-                            updateState.getValue()));
+        if (sourceListResponses != null && sourceListResponses.size() > 0) {

Review Comment:
   Please use the `CollectionUtils.isNotEmpty` method to check whether the list is empty or not.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] healchow merged pull request #4291: [INLONG-4232][Agent][Manager] Support collect data from a specified position for MySQL binlog

Posted by GitBox <gi...@apache.org>.
healchow merged PR #4291:
URL: https://github.com/apache/inlong/pull/4291


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] Greedyu commented on a diff in pull request #4291: [INLONG-4232][Manager] Support collect data from a specified position for MySQL binlog

Posted by GitBox <gi...@apache.org>.
Greedyu commented on code in PR #4291:
URL: https://github.com/apache/inlong/pull/4291#discussion_r906915773


##########
inlong-manager/manager-client-tools/src/test/resources/create_group.json:
##########
@@ -0,0 +1,98 @@
+{
+  "groupInfo": {
+    "groupName": "test_group14",
+    "inlongGroupId": "b_test_group45",
+    "description": "",
+    "inCharges": "admin",
+    "proxyClusterId": "2",
+    "mqType": "PULSAR",
+    "mqBaseConf": {

Review Comment:
   resolve



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] Greedyu commented on a diff in pull request #4291: [INLONG-4232][Manager] Support collect data from a specified position for MySQL binlog

Posted by GitBox <gi...@apache.org>.
Greedyu commented on code in PR #4291:
URL: https://github.com/apache/inlong/pull/4291#discussion_r908070977


##########
inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CommandCreate.java:
##########
@@ -71,7 +71,7 @@ void run() {
                 }
             } catch (Exception e) {
                 System.out.println("Create group failed!");
-                System.out.println(e.getMessage());
+                e.printStackTrace();

Review Comment:
   Modified to Logger output



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-inlong] Greedyu commented on a diff in pull request #4291: [INLONG-4232][Manager] Support collect data from a specified position for MySQL binlog

Posted by GitBox <gi...@apache.org>.
Greedyu commented on code in PR #4291:
URL: https://github.com/apache/incubator-inlong/pull/4291#discussion_r880003885


##########
inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLBinlogSource.java:
##########
@@ -100,4 +100,10 @@ public class MySQLBinlogSource extends StreamSource {
 
     @ApiModelProperty(value = "Primary key must be shared by all tables", required = false)
     private String primaryKey;
+
+    @ApiModelProperty("Directly read binlog from the specified offset filename")
+    public String specificOffsetFile;

Review Comment:
   Referring to the names used by Flink CDC are specificOffsetFile and specificOffsetPos, and the agent module also uses this name.
   ![image](https://user-images.githubusercontent.com/20356765/169936098-71b02658-1c31-4b63-abb2-09e7c5f9f25f.png)
   



##########
inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CommandCreate.java:
##########
@@ -71,7 +71,7 @@ void run() {
                 }
             } catch (Exception e) {
                 System.out.println("Create group failed!");
-                System.out.println(e.getMessage());

Review Comment:
   The main reason is that the e.getMessage() method cannot see the abnormal link, and the abnormal information is not enough.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org