You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2022/05/08 13:18:23 UTC

[GitHub] [incubator-inlong] healchow opened a new pull request, #4129: [INLONG-4128][Manager] Abstracting the logic for creating Hive tables

healchow opened a new pull request, #4129:
URL: https://github.com/apache/incubator-inlong/pull/4129

   ### Title Name: [INLONG-4128][Manager] Abstracting the logic for creating Hive tables
   
   Fixes #4128 
   
   ### Motivation
   
   Abstracting the logic for creating Hive tables.
   
   ### Modifications
   
   1. Remove unnecessary codes.
   2. Move the test connection method to `DataNodeService`.
   3. Refine the functionality of the SqlBuilder class.
   4. Abstract `HiveJdbcUtils` class, unified processing Hive resources.
   
   ### Verifying this change
   
   - [X] This change is a trivial rework/code cleanup without any test coverage.
   
   ### Documentation
   
     - Does this pull request introduces a new feature? (no)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-inlong] woofyzhao commented on a diff in pull request #4129: [INLONG-4128][Manager] Abstracting the logic for creating Hive tables

Posted by GitBox <gi...@apache.org>.
woofyzhao commented on code in PR #4129:
URL: https://github.com/apache/incubator-inlong/pull/4129#discussion_r867591080


##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/hive/HiveResourceOperator.java:
##########
@@ -84,87 +77,67 @@ public void createSinkResource(String groupId, SinkInfo sinkInfo) {
         this.createTable(groupId, sinkInfo);
     }
 
-    private void createTable(String groupId, SinkInfo config) {
+    private void createTable(String groupId, SinkInfo sinkInfo) {
         if (LOGGER.isDebugEnabled()) {
-            LOGGER.debug("begin create hive table for inlong group={}, config={}", groupId, config);
+            LOGGER.debug("begin to create hive table for group={}, sinkInfo={}", groupId, sinkInfo);
         }
 
-        // Get all info from config
-        HiveSinkDTO hiveInfo = HiveSinkDTO.getFromJson(config.getExtParams());
-        HiveTableQueryBean tableBean = getTableQueryBean(config, hiveInfo);
-        try {
-            // create database if not exists
-            dataSourceService.createDb(tableBean);
-
-            // check if the table exists
-            List<ColumnInfoBean> columns = dataSourceService.queryColumns(tableBean);
-            if (columns.size() == 0) {
-                // no such table, create one
-                dataSourceService.createTable(tableBean);
-            } else {
-                // set columns, skip the first columns already exist in hive
-                List<HiveColumnQueryBean> columnsSkipHistory = tableBean.getColumns().stream()
-                        .skip(columns.size()).collect(toList());
-                if (columnsSkipHistory.size() != 0) {
-                    tableBean.setColumns(columnsSkipHistory);
-                    dataSourceService.createColumn(tableBean);
-                }
-            }
-            sinkService.updateStatus(config.getId(),
-                    SinkStatus.CONFIG_SUCCESSFUL.getCode(), "create hive table success");
-        } catch (Throwable e) {
-            LOGGER.error("create hive table error, ", e);
-            sinkService.updateStatus(config.getId(), SinkStatus.CONFIG_FAILED.getCode(), e.getMessage());
-            throw new WorkflowException("create hive table failed, reason: " + e.getMessage());
+        String streamId = sinkInfo.getInlongStreamId();
+        List<StreamSinkFieldEntity> fieldList = hiveFieldMapper.selectFields(groupId, streamId);

Review Comment:
   This may return fields other than hive field when multiple sink types are configured on the same stream. Filtering on sink type needed or use the sink id as query param?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-inlong] healchow commented on a diff in pull request #4129: [INLONG-4128][Manager] Abstracting the logic for creating Hive tables

Posted by GitBox <gi...@apache.org>.
healchow commented on code in PR #4129:
URL: https://github.com/apache/incubator-inlong/pull/4129#discussion_r867596873


##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/hive/HiveResourceOperator.java:
##########
@@ -84,87 +77,67 @@ public void createSinkResource(String groupId, SinkInfo sinkInfo) {
         this.createTable(groupId, sinkInfo);
     }
 
-    private void createTable(String groupId, SinkInfo config) {
+    private void createTable(String groupId, SinkInfo sinkInfo) {
         if (LOGGER.isDebugEnabled()) {
-            LOGGER.debug("begin create hive table for inlong group={}, config={}", groupId, config);
+            LOGGER.debug("begin to create hive table for group={}, sinkInfo={}", groupId, sinkInfo);
         }
 
-        // Get all info from config
-        HiveSinkDTO hiveInfo = HiveSinkDTO.getFromJson(config.getExtParams());
-        HiveTableQueryBean tableBean = getTableQueryBean(config, hiveInfo);
-        try {
-            // create database if not exists
-            dataSourceService.createDb(tableBean);
-
-            // check if the table exists
-            List<ColumnInfoBean> columns = dataSourceService.queryColumns(tableBean);
-            if (columns.size() == 0) {
-                // no such table, create one
-                dataSourceService.createTable(tableBean);
-            } else {
-                // set columns, skip the first columns already exist in hive
-                List<HiveColumnQueryBean> columnsSkipHistory = tableBean.getColumns().stream()
-                        .skip(columns.size()).collect(toList());
-                if (columnsSkipHistory.size() != 0) {
-                    tableBean.setColumns(columnsSkipHistory);
-                    dataSourceService.createColumn(tableBean);
-                }
-            }
-            sinkService.updateStatus(config.getId(),
-                    SinkStatus.CONFIG_SUCCESSFUL.getCode(), "create hive table success");
-        } catch (Throwable e) {
-            LOGGER.error("create hive table error, ", e);
-            sinkService.updateStatus(config.getId(), SinkStatus.CONFIG_FAILED.getCode(), e.getMessage());
-            throw new WorkflowException("create hive table failed, reason: " + e.getMessage());
+        String streamId = sinkInfo.getInlongStreamId();
+        List<StreamSinkFieldEntity> fieldList = hiveFieldMapper.selectFields(groupId, streamId);

Review Comment:
   Good idea. I'll filter it by sink id.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-inlong] woofyzhao commented on a diff in pull request #4129: [INLONG-4128][Manager] Abstracting the logic for creating Hive tables

Posted by GitBox <gi...@apache.org>.
woofyzhao commented on code in PR #4129:
URL: https://github.com/apache/incubator-inlong/pull/4129#discussion_r867591080


##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/hive/HiveResourceOperator.java:
##########
@@ -84,87 +77,67 @@ public void createSinkResource(String groupId, SinkInfo sinkInfo) {
         this.createTable(groupId, sinkInfo);
     }
 
-    private void createTable(String groupId, SinkInfo config) {
+    private void createTable(String groupId, SinkInfo sinkInfo) {
         if (LOGGER.isDebugEnabled()) {
-            LOGGER.debug("begin create hive table for inlong group={}, config={}", groupId, config);
+            LOGGER.debug("begin to create hive table for group={}, sinkInfo={}", groupId, sinkInfo);
         }
 
-        // Get all info from config
-        HiveSinkDTO hiveInfo = HiveSinkDTO.getFromJson(config.getExtParams());
-        HiveTableQueryBean tableBean = getTableQueryBean(config, hiveInfo);
-        try {
-            // create database if not exists
-            dataSourceService.createDb(tableBean);
-
-            // check if the table exists
-            List<ColumnInfoBean> columns = dataSourceService.queryColumns(tableBean);
-            if (columns.size() == 0) {
-                // no such table, create one
-                dataSourceService.createTable(tableBean);
-            } else {
-                // set columns, skip the first columns already exist in hive
-                List<HiveColumnQueryBean> columnsSkipHistory = tableBean.getColumns().stream()
-                        .skip(columns.size()).collect(toList());
-                if (columnsSkipHistory.size() != 0) {
-                    tableBean.setColumns(columnsSkipHistory);
-                    dataSourceService.createColumn(tableBean);
-                }
-            }
-            sinkService.updateStatus(config.getId(),
-                    SinkStatus.CONFIG_SUCCESSFUL.getCode(), "create hive table success");
-        } catch (Throwable e) {
-            LOGGER.error("create hive table error, ", e);
-            sinkService.updateStatus(config.getId(), SinkStatus.CONFIG_FAILED.getCode(), e.getMessage());
-            throw new WorkflowException("create hive table failed, reason: " + e.getMessage());
+        String streamId = sinkInfo.getInlongStreamId();
+        List<StreamSinkFieldEntity> fieldList = hiveFieldMapper.selectFields(groupId, streamId);

Review Comment:
   This may return fields other than hive field when multiple sink types are configured on the same stream. Filter on type needed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-inlong] healchow merged pull request #4129: [INLONG-4128][Manager] Abstracting the logic for creating Hive tables

Posted by GitBox <gi...@apache.org>.
healchow merged PR #4129:
URL: https://github.com/apache/incubator-inlong/pull/4129


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org