You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/08/30 02:07:47 UTC

[inlong] branch release-1.3.0 updated (7a2f34588 -> 221853ec9)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a change to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git


    from 7a2f34588 [INLONG-5718][Manager][DataProxy] Fix log4j2 configuration does not take effect (#5719)
     new 5c3b457d5 [INLONG-5723][DataProxy] Fix source and sink metrics are incorrect when msgType equals 5. (#5728)
     new edde3d996 [INLONG-5712][Sort] Fix class not found in Elasticsearch 6 connector for producting (#5717)
     new 2afe325af [INLONG-5729][Manager] Fix the failure of suspending or restarting the sources (#5730)
     new 221853ec9 [INLONG-5731][Dashboard] Add data node management (#5732)

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 inlong-dashboard/src/configs/menus/index.ts        |  4 +
 inlong-dashboard/src/configs/routes/index.tsx      | 10 +--
 inlong-dashboard/src/i18n.ts                       |  2 +
 inlong-dashboard/src/locales/cn.json               |  8 ++
 inlong-dashboard/src/locales/en.json               |  8 ++
 inlong-dashboard/src/metas/common.ts               |  5 +-
 .../metas/{clusters/TubeMQ.tsx => nodes/hive.tsx}  | 33 ++++----
 inlong-dashboard/src/metas/nodes/index.tsx         | 87 ++++++++++++++++++++++
 .../CreateModal.tsx => Nodes/DetailModal.tsx}      | 18 ++---
 .../src/pages/{Clusters => Nodes}/index.tsx        | 56 +++++---------
 .../dataproxy/source/ServerMessageHandler.java     | 44 ++++++-----
 .../manager/client/api/impl/InlongGroupImpl.java   | 51 +++++--------
 .../inlong/manager/pojo/group/InlongGroupInfo.java |  4 +-
 .../manager/pojo/group/InlongGroupRequest.java     |  4 +-
 .../main/resources/h2/apache_inlong_manager.sql    |  2 +-
 .../manager-web/sql/apache_inlong_manager.sql      | 18 ++---
 .../sort-connectors/elasticsearch-6/pom.xml        |  2 +-
 .../sort-connectors/elasticsearch-7/pom.xml        |  1 +
 inlong-sort/sort-connectors/hbase/pom.xml          |  1 +
 inlong-sort/sort-core/pom.xml                      |  6 ++
 .../sort/parser/Elasticsearch6SqlParseTest.java    | 13 ++--
 .../sort/parser/Elasticsearch7SqlParseTest.java    | 14 ++--
 .../sort/parser/ElasticsearchSqlParseTest.java     | 18 +++--
 pom.xml                                            |  1 +
 24 files changed, 245 insertions(+), 165 deletions(-)
 copy inlong-dashboard/src/metas/{clusters/TubeMQ.tsx => nodes/hive.tsx} (63%)
 create mode 100644 inlong-dashboard/src/metas/nodes/index.tsx
 copy inlong-dashboard/src/pages/{Clusters/CreateModal.tsx => Nodes/DetailModal.tsx} (86%)
 copy inlong-dashboard/src/pages/{Clusters => Nodes}/index.tsx (72%)
 copy inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/UtilsTest.java => inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/Elasticsearch6SqlParseTest.java (71%)
 copy inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/UtilsTest.java => inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/Elasticsearch7SqlParseTest.java (71%)


[inlong] 04/04: [INLONG-5731][Dashboard] Add data node management (#5732)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 221853ec9fd08a58473c9b8cf79ab3f5d0d17062
Author: Daniel <le...@apache.org>
AuthorDate: Mon Aug 29 19:30:17 2022 +0800

    [INLONG-5731][Dashboard] Add data node management (#5732)
---
 inlong-dashboard/src/configs/menus/index.ts      |   4 +
 inlong-dashboard/src/configs/routes/index.tsx    |  10 +-
 inlong-dashboard/src/i18n.ts                     |   2 +
 inlong-dashboard/src/locales/cn.json             |   8 +
 inlong-dashboard/src/locales/en.json             |   8 +
 inlong-dashboard/src/metas/common.ts             |   5 +-
 inlong-dashboard/src/metas/nodes/hive.tsx        |  47 ++++++
 inlong-dashboard/src/metas/nodes/index.tsx       |  87 +++++++++++
 inlong-dashboard/src/pages/Nodes/DetailModal.tsx | 105 +++++++++++++
 inlong-dashboard/src/pages/Nodes/index.tsx       | 182 +++++++++++++++++++++++
 10 files changed, 451 insertions(+), 7 deletions(-)

diff --git a/inlong-dashboard/src/configs/menus/index.ts b/inlong-dashboard/src/configs/menus/index.ts
index 3bed57279..12d134f38 100644
--- a/inlong-dashboard/src/configs/menus/index.ts
+++ b/inlong-dashboard/src/configs/menus/index.ts
@@ -48,6 +48,10 @@ const menus: MenuItemType[] = [
       },
     ],
   },
+  {
+    path: '/node',
+    name: i18n.t('configs.menus.Node'),
+  },
   {
     path: '/process',
     name: i18n.t('configs.menus.ApprovalManagement'),
diff --git a/inlong-dashboard/src/configs/routes/index.tsx b/inlong-dashboard/src/configs/routes/index.tsx
index 3391d54e0..c63c5983a 100644
--- a/inlong-dashboard/src/configs/routes/index.tsx
+++ b/inlong-dashboard/src/configs/routes/index.tsx
@@ -88,11 +88,6 @@ const routes: RouteProps[] = [
       },
     ],
   },
-  // {
-  //   path: '/dataSources',
-  //   component: () => import('@/pages/DataSources'),
-  //   exact: true,
-  // },
   {
     path: '/user',
     component: () => import('@/pages/UserManagement'),
@@ -120,6 +115,11 @@ const routes: RouteProps[] = [
     component: () => import('@/pages/ClusterTags'),
     exact: true,
   },
+  {
+    path: '/node',
+    component: () => import('@/pages/Nodes'),
+    exact: true,
+  },
   {
     component: () => import('@/pages/Error/404'),
   },
diff --git a/inlong-dashboard/src/i18n.ts b/inlong-dashboard/src/i18n.ts
index de60f1212..3c7e6a0fe 100644
--- a/inlong-dashboard/src/i18n.ts
+++ b/inlong-dashboard/src/i18n.ts
@@ -34,6 +34,7 @@ const resources = {
       'configs.menus.SystemManagement': 'System',
       'configs.menus.UserManagement': 'User',
       'configs.menus.ResponsibleManagement': 'ApprovalManagement',
+      'configs.menus.Node': 'Nodes',
     },
   },
   cn: {
@@ -47,6 +48,7 @@ const resources = {
       'configs.menus.SystemManagement': '系统管理',
       'configs.menus.UserManagement': '用户管理',
       'configs.menus.ResponsibleManagement': '审批责任人管理',
+      'configs.menus.Node': '节点管理',
     },
   },
 };
diff --git a/inlong-dashboard/src/locales/cn.json b/inlong-dashboard/src/locales/cn.json
index 328831c33..948e13cd5 100644
--- a/inlong-dashboard/src/locales/cn.json
+++ b/inlong-dashboard/src/locales/cn.json
@@ -263,6 +263,14 @@
   "meta.Consumption.MasterAddress": "Master地址",
   "meta.Consumption.Yes": "是",
   "meta.Consumption.OwnersExtra": "消费责任人,可查看、修改消费信息",
+  "meta.Nodes.Name": "节点名称",
+  "meta.Nodes.Type": "类型",
+  "meta.Nodes.Owners": "责任人",
+  "meta.Nodes.Description": "描述",
+  "meta.Nodes.Hive.DataPath": "数据路径",
+  "meta.Nodes.Hive.DataPathHelp": "DB的存储路径,不包括表名,如:hdfs://127.0.0.1:9000/warehouse/inlong.db",
+  "meta.Nodes.Hive.ConfDir": "配置路径",
+  "meta.Nodes.Hive.ConfDirHelp": "将 Hive 集群的 hive-site.xml 文件上传到 HDFS 的某个目录下,如:/user/hive/conf",
   "components.EditableTable.NewLine": "新增一行",
   "components.FormGenerator.plugins.PleaseChoose": "请选择",
   "components.FormGenerator.plugins.PleaseInput": "请输入",
diff --git a/inlong-dashboard/src/locales/en.json b/inlong-dashboard/src/locales/en.json
index 8838587c3..266b5a3ee 100644
--- a/inlong-dashboard/src/locales/en.json
+++ b/inlong-dashboard/src/locales/en.json
@@ -263,6 +263,14 @@
   "meta.Consumption.MasterAddress": "Master address",
   "meta.Consumption.Yes": "Yes",
   "meta.Consumption.OwnersExtra": "Consumption in charges, they can view, modify consumption information",
+  "meta.Nodes.Name": "Name",
+  "meta.Nodes.Type": "Type",
+  "meta.Nodes.Owners": "Owners",
+  "meta.Nodes.Description": "Description",
+  "meta.Nodes.Hive.DataPath": "DataPath",
+  "meta.Nodes.Hive.DataPathHelp": "Storage path of the DB, excluding the table name, such as: hdfs://127.0.0.1:9000/warehouse/inlong.db",
+  "meta.Nodes.Hive.ConfDir": "ConfDir",
+  "meta.Nodes.Hive.ConfDirHelp": "Upload the hive-site.xml file of the Hive cluster to a directory in HDFS, such as: /user/hive/conf",
   "components.EditableTable.NewLine": "New line",
   "components.FormGenerator.plugins.PleaseChoose": "Please select",
   "components.FormGenerator.plugins.PleaseInput": "Please input",
diff --git a/inlong-dashboard/src/metas/common.ts b/inlong-dashboard/src/metas/common.ts
index 59a7c293f..0c32bbd37 100644
--- a/inlong-dashboard/src/metas/common.ts
+++ b/inlong-dashboard/src/metas/common.ts
@@ -28,12 +28,13 @@ export interface FieldItemType extends FormItemProps {
 
 export const genFields = (
   fieldsDefault: FieldItemType[],
-  fieldsExtends: FieldItemType[],
-): FormItemProps[] => {
+  fieldsExtends?: FieldItemType[],
+): FieldItemType[] => {
   const output: FieldItemType[] = [];
   const fields = fieldsDefault.concat(fieldsExtends);
   while (fields.length) {
     const fieldItem = fields.shift();
+    if (!fieldItem) continue;
     if (fieldItem.position) {
       const [positionType, positionName] = fieldItem.position;
       const index = output.findIndex(item => item.name === positionName);
diff --git a/inlong-dashboard/src/metas/nodes/hive.tsx b/inlong-dashboard/src/metas/nodes/hive.tsx
new file mode 100644
index 000000000..74436f2dc
--- /dev/null
+++ b/inlong-dashboard/src/metas/nodes/hive.tsx
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import i18n from '@/i18n';
+import type { FieldItemType } from '@/metas/common';
+
+export const hive: FieldItemType[] = [
+  {
+    type: 'input',
+    label: 'JDBC URL',
+    name: 'jdbcUrl',
+    rules: [{ required: true }],
+    initialValue: 'jdbc:hive2://127.0.0.1:10000',
+  },
+  {
+    type: 'input',
+    label: i18n.t('meta.Sinks.Hive.DataPath'),
+    name: 'dataPath',
+    rules: [{ required: true }],
+    tooltip: i18n.t('meta.Sinks.DataPathHelp'),
+    initialValue: 'hdfs://127.0.0.1:9000/user/hive/warehouse/default',
+  },
+  {
+    type: 'input',
+    label: i18n.t('meta.Sinks.Hive.ConfDir'),
+    name: 'hiveConfDir',
+    rules: [{ required: true }],
+    tooltip: i18n.t('meta.Sinks.Hive.ConfDirHelp'),
+    initialValue: '/usr/hive/conf',
+  },
+];
diff --git a/inlong-dashboard/src/metas/nodes/index.tsx b/inlong-dashboard/src/metas/nodes/index.tsx
new file mode 100644
index 000000000..c2d47b5fa
--- /dev/null
+++ b/inlong-dashboard/src/metas/nodes/index.tsx
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import React from 'react';
+import i18n from '@/i18n';
+import UserSelect from '@/components/UserSelect';
+import type { FieldItemType } from '@/metas/common';
+import { genFields, genForm, genTable } from '@/metas/common';
+import { hive } from './hive';
+
+const allNodes = [
+  {
+    label: 'Hive',
+    value: 'HIVE',
+    fields: hive,
+  },
+];
+
+const defaultCommonFields: FieldItemType[] = [
+  {
+    type: 'input',
+    label: i18n.t('meta.Nodes.Name'),
+    name: 'name',
+    rules: [{ required: true }],
+    props: {
+      maxLength: 128,
+    },
+    _renderTable: true,
+  },
+  {
+    type: 'select',
+    label: i18n.t('meta.Nodes.Type'),
+    name: 'type',
+    initialValue: allNodes[0].value,
+    rules: [{ required: true }],
+    props: {
+      options: allNodes.map(item => ({
+        label: item.label,
+        value: item.value,
+      })),
+    },
+    _renderTable: true,
+  },
+  {
+    type: <UserSelect mode="multiple" currentUserClosable={false} />,
+    label: i18n.t('meta.Nodes.Owners'),
+    name: 'inCharges',
+    rules: [{ required: true }],
+    _renderTable: true,
+  },
+  {
+    type: 'textarea',
+    label: i18n.t('meta.Nodes.Description'),
+    name: 'description',
+    props: {
+      maxLength: 256,
+    },
+  },
+];
+
+export const nodes = allNodes.map(item => {
+  const itemFields = defaultCommonFields.concat(item.fields);
+  const fields = genFields(itemFields);
+
+  return {
+    ...item,
+    fields,
+    form: genForm(fields),
+    table: genTable(fields),
+  };
+});
diff --git a/inlong-dashboard/src/pages/Nodes/DetailModal.tsx b/inlong-dashboard/src/pages/Nodes/DetailModal.tsx
new file mode 100644
index 000000000..b6c672ada
--- /dev/null
+++ b/inlong-dashboard/src/pages/Nodes/DetailModal.tsx
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import React, { useState, useMemo } from 'react';
+import { Modal, message } from 'antd';
+import { ModalProps } from 'antd/es/modal';
+import FormGenerator, { useForm } from '@/components/FormGenerator';
+import { useRequest, useUpdateEffect } from '@/hooks';
+import request from '@/utils/request';
+import { nodes } from '@/metas/nodes';
+import i18n from '@/i18n';
+
+export interface Props extends ModalProps {
+  // Require when edit
+  id?: string;
+}
+
+const Comp: React.FC<Props> = ({ id, ...modalProps }) => {
+  const [form] = useForm();
+
+  const [type, setType] = useState(nodes[0].value);
+
+  const { data: savedData, run: getData } = useRequest(
+    id => ({
+      url: `/node/get/${id}`,
+    }),
+    {
+      manual: true,
+      formatResult: result => ({
+        ...result,
+        inCharges: result.inCharges?.split(','),
+        clusterTags: result.clusterTags?.split(','),
+      }),
+      onSuccess: result => {
+        form.setFieldsValue(result);
+        setType(result.type);
+      },
+    },
+  );
+
+  const onOk = async () => {
+    const values = await form.validateFields();
+    const isUpdate = id;
+    const submitData = {
+      ...values,
+      inCharges: values.inCharges?.join(','),
+      clusterTags: values.clusterTags?.join(','),
+    };
+    if (isUpdate) {
+      submitData.id = id;
+      submitData.version = savedData?.version;
+    }
+    await request({
+      url: `/node/${isUpdate ? 'update' : 'save'}`,
+      method: 'POST',
+      data: submitData,
+    });
+    await modalProps?.onOk(submitData);
+    message.success(i18n.t('basic.OperatingSuccess'));
+  };
+
+  useUpdateEffect(() => {
+    if (modalProps.visible) {
+      // open
+      form.resetFields();
+      if (id) {
+        getData(id);
+      }
+    }
+  }, [modalProps.visible]);
+
+  const content = useMemo(() => {
+    const current = nodes.find(item => item.value === type);
+    return current?.form;
+  }, [type]);
+
+  return (
+    <Modal {...modalProps} title={id ? i18n.t('basic.Detail') : i18n.t('basic.Create')} onOk={onOk}>
+      <FormGenerator
+        content={content}
+        form={form}
+        onValuesChange={(c, values) => setType(values.type)}
+        useMaxWidth
+      />
+    </Modal>
+  );
+};
+
+export default Comp;
diff --git a/inlong-dashboard/src/pages/Nodes/index.tsx b/inlong-dashboard/src/pages/Nodes/index.tsx
new file mode 100644
index 000000000..37c25957f
--- /dev/null
+++ b/inlong-dashboard/src/pages/Nodes/index.tsx
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import React, { useCallback, useMemo, useState } from 'react';
+import { Button, Modal, message } from 'antd';
+import i18n from '@/i18n';
+import HighTable from '@/components/HighTable';
+import { PageContainer } from '@/components/PageContainer';
+import { defaultSize } from '@/configs/pagination';
+import { useRequest } from '@/hooks';
+import { nodes } from '@/metas/nodes';
+import DetailModal from './DetailModal';
+import request from '@/utils/request';
+
+const getFilterFormContent = defaultValues => [
+  {
+    type: 'inputsearch',
+    name: 'keyword',
+  },
+  {
+    type: 'radiobutton',
+    name: 'type',
+    label: i18n.t('meta.Nodes.Type'),
+    initialValue: defaultValues.type,
+    props: {
+      buttonStyle: 'solid',
+      options: nodes.map(item => ({
+        label: item.label,
+        value: item.value,
+      })),
+    },
+  },
+];
+
+const Comp: React.FC = () => {
+  const [options, setOptions] = useState({
+    keyword: '',
+    pageSize: defaultSize,
+    pageNum: 1,
+    type: nodes[0].value,
+  });
+
+  const [detailModal, setDetailModal] = useState<Record<string, unknown>>({
+    visible: false,
+  });
+
+  const { data, loading, run: getList } = useRequest(
+    {
+      url: '/node/list',
+      method: 'POST',
+      data: {
+        ...options,
+      },
+    },
+    {
+      refreshDeps: [options],
+    },
+  );
+
+  const onEdit = ({ id }) => {
+    setDetailModal({ visible: true, id });
+  };
+
+  const onDelete = useCallback(
+    ({ id }) => {
+      Modal.confirm({
+        title: i18n.t('basic.DeleteConfirm'),
+        onOk: async () => {
+          await request({
+            url: `/node/delete/${id}`,
+            method: 'DELETE',
+          });
+          await getList();
+          message.success(i18n.t('basic.DeleteSuccess'));
+        },
+      });
+    },
+    [getList],
+  );
+
+  const onChange = ({ current: pageNum, pageSize }) => {
+    setOptions(prev => ({
+      ...prev,
+      pageNum,
+      pageSize,
+    }));
+  };
+
+  const onFilter = allValues => {
+    setOptions(prev => ({
+      ...prev,
+      ...allValues,
+      pageNum: 1,
+    }));
+  };
+
+  const pagination = {
+    pageSize: +options.pageSize,
+    current: +options.pageNum,
+    total: data?.total,
+  };
+
+  const columns = useMemo(() => {
+    const current = nodes.find(item => item.value === options.type);
+    if (!current?.table) return [];
+
+    return current.table
+      .map(item => ({
+        ...item,
+        ellipsisMulti: 2,
+      }))
+      .concat([
+        {
+          title: i18n.t('basic.Operating'),
+          dataIndex: 'action',
+          width: 200,
+          render: (text, record) => (
+            <>
+              <Button type="link" onClick={() => onEdit(record)}>
+                {i18n.t('basic.Edit')}
+              </Button>
+              <Button type="link" onClick={() => onDelete(record)}>
+                {i18n.t('basic.Delete')}
+              </Button>
+            </>
+          ),
+        } as any,
+      ]);
+  }, [options.type, onDelete]);
+
+  return (
+    <PageContainer useDefaultBreadcrumb={false}>
+      <HighTable
+        filterForm={{
+          content: getFilterFormContent(options),
+          onFilter,
+        }}
+        suffix={
+          <Button type="primary" onClick={() => setDetailModal({ visible: true })}>
+            {i18n.t('basic.Create')}
+          </Button>
+        }
+        table={{
+          columns,
+          rowKey: 'id',
+          dataSource: data?.list,
+          pagination,
+          loading,
+          onChange,
+        }}
+      />
+
+      <DetailModal
+        {...detailModal}
+        visible={detailModal.visible as boolean}
+        onOk={async () => {
+          await getList();
+          setDetailModal({ visible: false });
+        }}
+        onCancel={() => setDetailModal({ visible: false })}
+      />
+    </PageContainer>
+  );
+};
+
+export default Comp;


[inlong] 01/04: [INLONG-5723][DataProxy] Fix source and sink metrics are incorrect when msgType equals 5. (#5728)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 5c3b457d54c7a38301b685f238e5de0c7f2b3ca2
Author: xueyingzhang <86...@users.noreply.github.com>
AuthorDate: Mon Aug 29 14:08:10 2022 +0800

    [INLONG-5723][DataProxy] Fix source and sink metrics are incorrect when msgType equals 5. (#5728)
---
 .../dataproxy/source/ServerMessageHandler.java     | 44 ++++++++++++----------
 1 file changed, 24 insertions(+), 20 deletions(-)

diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
index 8c158d3ce..d79143260 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
@@ -17,28 +17,14 @@
 
 package org.apache.inlong.dataproxy.source;
 
-import static org.apache.inlong.dataproxy.consts.AttributeConstants.SEPARATOR;
-import static org.apache.inlong.dataproxy.consts.ConfigConstants.SLA_METRIC_DATA;
-import static org.apache.inlong.dataproxy.consts.ConfigConstants.SLA_METRIC_GROUPID;
-import static org.apache.inlong.dataproxy.source.SimpleTcpSource.blacklist;
-
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.channel.group.ChannelGroup;
-import java.io.IOException;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicReference;
-
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flume.ChannelException;
 import org.apache.flume.Event;
@@ -63,8 +49,21 @@ import org.apache.inlong.dataproxy.utils.NetworkUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Joiner;
-import com.google.common.base.Splitter;
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.inlong.dataproxy.consts.AttributeConstants.SEPARATOR;
+import static org.apache.inlong.dataproxy.consts.ConfigConstants.SLA_METRIC_DATA;
+import static org.apache.inlong.dataproxy.consts.ConfigConstants.SLA_METRIC_GROUPID;
+import static org.apache.inlong.dataproxy.source.SimpleTcpSource.blacklist;
 
 /**
  * Server message handler
@@ -400,6 +399,7 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
         } else if (MsgType.MSG_BIN_MULTI_BODY.equals(msgType)) {
             inLongMsgVer = 4;
         }
+        int recordMsgCnt = Integer.parseInt(commonAttrMap.get(AttributeConstants.MESSAGE_COUNT));
 
         for (Map.Entry<String, HashMap<String, List<ProxyMessage>>> topicEntry : messageMap.entrySet()) {
             for (Map.Entry<String, List<ProxyMessage>> streamIdEntry : topicEntry.getValue().entrySet()) {
@@ -456,6 +456,10 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
                 headers.put(ConfigConstants.REMOTE_IDC_KEY, DEFAULT_REMOTE_IDC_VALUE);
                 // every message share the same msg cnt? what if msgType = 5
                 String proxyMetricMsgCnt = commonAttrMap.get(AttributeConstants.MESSAGE_COUNT);
+                if (MsgType.MSG_MULTI_BODY_ATTR.equals(msgType) || MsgType.MSG_MULTI_BODY.equals(msgType)) {
+                    commonAttrMap.put(AttributeConstants.MESSAGE_COUNT, String.valueOf(recordMsgCnt));
+                    proxyMetricMsgCnt = commonAttrMap.get(AttributeConstants.MESSAGE_COUNT);
+                }
                 headers.put(ConfigConstants.MSG_COUNTER_KEY, proxyMetricMsgCnt);
 
                 byte[] data = inLongMsg.buildArray();
@@ -510,7 +514,7 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
                 } catch (Throwable ex) {
                     logger.error("Error writting to channel,data will discard.", ex);
                     monitorIndexExt.incrementAndGet("EVENT_DROPPED");
-                    monitorIndex.addAndGet(new String(newbase), 0,0,0,
+                    monitorIndex.addAndGet(new String(newbase), 0, 0, 0,
                             Integer.parseInt(proxyMetricMsgCnt));
                     this.addMetric(false, data.length, event);
                     throw new ChannelException("ProcessEvent error can't write event to channel.");
@@ -731,7 +735,7 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
 
     /**
      * addMetric
-     * 
+     *
      * @param result
      * @param size
      * @param event


[inlong] 03/04: [INLONG-5729][Manager] Fix the failure of suspending or restarting the sources (#5730)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 2afe325afb48a2244f1a2b252f61b4acdc83dcb5
Author: healchow <he...@gmail.com>
AuthorDate: Mon Aug 29 19:27:57 2022 +0800

    [INLONG-5729][Manager] Fix the failure of suspending or restarting the sources (#5730)
---
 .../manager/client/api/impl/InlongGroupImpl.java   | 51 ++++++++--------------
 .../inlong/manager/pojo/group/InlongGroupInfo.java |  4 +-
 .../manager/pojo/group/InlongGroupRequest.java     |  4 +-
 .../main/resources/h2/apache_inlong_manager.sql    |  2 +-
 .../manager-web/sql/apache_inlong_manager.sql      | 18 ++++----
 5 files changed, 31 insertions(+), 48 deletions(-)

diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
index 403ff81b2..348dcf3ea 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
@@ -24,7 +24,6 @@ import org.apache.inlong.manager.client.api.InlongGroup;
 import org.apache.inlong.manager.client.api.InlongGroupContext;
 import org.apache.inlong.manager.client.api.InlongStream;
 import org.apache.inlong.manager.client.api.InlongStreamBuilder;
-import org.apache.inlong.manager.common.enums.SimpleGroupStatus;
 import org.apache.inlong.manager.client.api.inner.InnerGroupContext;
 import org.apache.inlong.manager.client.api.inner.client.ClientFactory;
 import org.apache.inlong.manager.client.api.inner.client.InlongGroupClient;
@@ -34,11 +33,11 @@ import org.apache.inlong.manager.client.api.util.ClientUtils;
 import org.apache.inlong.manager.client.api.util.InlongGroupTransfer;
 import org.apache.inlong.manager.common.enums.GroupStatus;
 import org.apache.inlong.manager.common.enums.ProcessStatus;
+import org.apache.inlong.manager.common.enums.SimpleGroupStatus;
 import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.pojo.group.InlongGroupCountResponse;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
-import org.apache.inlong.manager.pojo.group.InlongGroupRequest;
 import org.apache.inlong.manager.pojo.group.InlongGroupResetRequest;
 import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
 import org.apache.inlong.manager.pojo.sort.BaseSortConf;
@@ -147,41 +146,29 @@ public class InlongGroupImpl implements InlongGroup {
         Preconditions.checkTrue(groupId != null && groupId.equals(this.groupInfo.getInlongGroupId()),
                 "groupId must be same");
 
-        InlongGroupInfo existGroupInfo = groupClient.getGroupInfo(groupId);
-        Preconditions.checkNotNull(existGroupInfo, "inlong group does not exists, cannot update");
-        SimpleGroupStatus status = SimpleGroupStatus.parseStatusByCode(existGroupInfo.getStatus());
-        Preconditions.checkTrue(status != SimpleGroupStatus.INITIALIZING,
-                "inlong group is in init status, should not be updated");
-
         InlongGroupInfo groupInfo = InlongGroupTransfer.createGroupInfo(originGroupInfo, sortConf);
-        groupInfo.setVersion(existGroupInfo.getVersion());
-        InlongGroupRequest groupRequest = groupInfo.genRequest();
-        Pair<String, String> idAndErr = groupClient.updateGroup(groupRequest);
-        String errMsg = idAndErr.getValue();
-        Preconditions.checkNull(errMsg, errMsg);
-
-        this.groupContext.setGroupInfo(groupInfo);
-        this.groupInfo = groupInfo;
+        this.updateOpt(groupInfo);
+        this.groupInfo = this.groupContext.getGroupInfo();
     }
 
     @Override
     public void update(BaseSortConf sortConf) throws Exception {
         Preconditions.checkNotNull(sortConf, "sort conf cannot be null");
+        this.updateOpt(InlongGroupTransfer.createGroupInfo(this.groupInfo, sortConf));
+    }
 
-        final String groupId = this.groupInfo.getInlongGroupId();
-
-        InlongGroupInfo existGroupInfo = groupClient.getGroupInfo(groupId);
-        Preconditions.checkNotNull(existGroupInfo, "inlong group does not exists, cannot update");
+    private void updateOpt(InlongGroupInfo groupInfo) {
+        InlongGroupInfo existGroupInfo = groupClient.getGroupInfo(groupInfo.getInlongGroupId());
+        Preconditions.checkNotNull(existGroupInfo, "inlong group does not exist, cannot be updated");
         SimpleGroupStatus status = SimpleGroupStatus.parseStatusByCode(existGroupInfo.getStatus());
         Preconditions.checkTrue(status != SimpleGroupStatus.INITIALIZING,
-                "inlong group is in init status, should not be updated");
+                "inlong group is in init status, cannot be updated");
 
-        InlongGroupInfo groupInfo = InlongGroupTransfer.createGroupInfo(this.groupInfo, sortConf);
         groupInfo.setVersion(existGroupInfo.getVersion());
-        InlongGroupRequest groupRequest = groupInfo.genRequest();
-        Pair<String, String> idAndErr = groupClient.updateGroup(groupRequest);
+        Pair<String, String> idAndErr = groupClient.updateGroup(groupInfo.genRequest());
         String errMsg = idAndErr.getValue();
         Preconditions.checkNull(errMsg, errMsg);
+
         this.groupContext.setGroupInfo(groupInfo);
     }
 
@@ -207,11 +194,9 @@ public class InlongGroupImpl implements InlongGroup {
     @Override
     public InlongGroupContext suspend(boolean async) {
         InlongGroupInfo groupInfo = groupContext.getGroupInfo();
-        Pair<String, String> idAndErr = groupClient.updateGroup(groupInfo.genRequest());
-        final String errMsg = idAndErr.getValue();
-        final String groupId = idAndErr.getKey();
-        Preconditions.checkNull(errMsg, errMsg);
-        groupClient.operateInlongGroup(groupId, SimpleGroupStatus.STOPPED, async);
+        this.updateOpt(groupInfo);
+
+        groupClient.operateInlongGroup(groupInfo.getInlongGroupId(), SimpleGroupStatus.STOPPED, async);
         return generateSnapshot();
     }
 
@@ -223,11 +208,9 @@ public class InlongGroupImpl implements InlongGroup {
     @Override
     public InlongGroupContext restart(boolean async) {
         InlongGroupInfo groupInfo = groupContext.getGroupInfo();
-        Pair<String, String> idAndErr = groupClient.updateGroup(groupInfo.genRequest());
-        final String errMsg = idAndErr.getValue();
-        final String groupId = idAndErr.getKey();
-        Preconditions.checkNull(errMsg, errMsg);
-        groupClient.operateInlongGroup(groupId, SimpleGroupStatus.STARTED, async);
+        this.updateOpt(groupInfo);
+
+        groupClient.operateInlongGroup(groupInfo.getInlongGroupId(), SimpleGroupStatus.STARTED, async);
         return generateSnapshot();
     }
 
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupInfo.java
index 5f3b22754..0b8e93656 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupInfo.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupInfo.java
@@ -71,10 +71,10 @@ public abstract class InlongGroupInfo {
     @Builder.Default
     private Integer enableZookeeper = 0;
 
-    @ApiModelProperty(value = "Whether to enable zookeeper? 0: disable, 1: enable")
+    @ApiModelProperty(value = "Whether to enable create resource? 0: disable, 1: enable")
     private Integer enableCreateResource;
 
-    @ApiModelProperty(value = "Whether to use lightweight mode, 0: false, 1: true")
+    @ApiModelProperty(value = "Whether to use lightweight mode, 0: no, 1: yes")
     private Integer lightweight;
 
     @ApiModelProperty(value = "Inlong cluster tag, which links to inlong_cluster table")
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupRequest.java
index f687e43d9..3c07a9c79 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupRequest.java
@@ -70,10 +70,10 @@ public abstract class InlongGroupRequest {
     @ApiModelProperty(value = "Whether to enable zookeeper? 0: disable, 1: enable")
     private Integer enableZookeeper = 0;
 
-    @ApiModelProperty(value = "Whether to enable zookeeper? 0: disable, 1: enable")
+    @ApiModelProperty(value = "Whether to enable create resource? 0: disable, 1: enable")
     private Integer enableCreateResource = 1;
 
-    @ApiModelProperty(value = "Whether to use lightweight mode, 0: false, 1: true")
+    @ApiModelProperty(value = "Whether to use lightweight mode, 0: no, 1: yes")
     private Integer lightweight = 0;
 
     @ApiModelProperty(value = "Inlong cluster tag, which links to inlong_cluster table")
diff --git a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
index 99a22413c..b5e12851a 100644
--- a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
+++ b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
@@ -35,7 +35,7 @@ CREATE TABLE IF NOT EXISTS `inlong_group`
     `max_length`             int(11)               DEFAULT '10240' COMMENT 'The maximum length of a single piece of data, unit: Byte',
     `enable_zookeeper`       tinyint(1)            DEFAULT '0' COMMENT 'Whether to enable the zookeeper, 0-disable, 1-enable',
     `enable_create_resource` tinyint(1)            DEFAULT '1' COMMENT 'Whether to enable create resource? 0-disable, 1-enable',
-    `lightweight`            tinyint(1)            DEFAULT '0' COMMENT 'Whether to use lightweight mode, 0-false, 1-true',
+    `lightweight`            tinyint(1)            DEFAULT '0' COMMENT 'Whether to use lightweight mode, 0-no, 1-yes',
     `inlong_cluster_tag`     varchar(128)          DEFAULT NULL COMMENT 'The cluster tag, which links to inlong_cluster table',
     `ext_params`             text                  DEFAULT NULL COMMENT 'Extended params, will be saved as JSON string, such as queue_module, partition_num',
     `in_charges`             varchar(512) NOT NULL COMMENT 'Name of responsible person, separated by commas',
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index e7bbf585b..25aae58ca 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -41,7 +41,7 @@ CREATE TABLE IF NOT EXISTS `inlong_group`
     `max_length`             int(11)               DEFAULT '10240' COMMENT 'The maximum length of a single piece of data, unit: Byte',
     `enable_zookeeper`       tinyint(1)            DEFAULT '0' COMMENT 'Whether to enable the zookeeper, 0-disable, 1-enable',
     `enable_create_resource` tinyint(1)            DEFAULT '1' COMMENT 'Whether to enable create resource? 0-disable, 1-enable',
-    `lightweight`            tinyint(1)            DEFAULT '0' COMMENT 'Whether to use lightweight mode, 0-false, 1-true',
+    `lightweight`            tinyint(1)            DEFAULT '0' COMMENT 'Whether to use lightweight mode, 0-no, 1-yes',
     `inlong_cluster_tag`     varchar(128)          DEFAULT NULL COMMENT 'The cluster tag, which links to inlong_cluster table',
     `ext_params`             text                  DEFAULT NULL COMMENT 'Extended params, will be saved as JSON string, such as queue_module, partition_num',
     `in_charges`             varchar(512) NOT NULL COMMENT 'Name of responsible person, separated by commas',
@@ -402,7 +402,7 @@ CREATE TABLE IF NOT EXISTS `stream_source`
     `inlong_stream_id`    varchar(256) NOT NULL COMMENT 'Inlong stream id',
     `source_name`         varchar(128) NOT NULL DEFAULT '' COMMENT 'source_name',
     `source_type`         varchar(20)           DEFAULT '0' COMMENT 'Source type, including: FILE, DB, etc',
-    `template_id`         int(11)      DEFAULT NULL COMMENT 'Id of the template task this agent belongs to',
+    `template_id`         int(11)               DEFAULT NULL COMMENT 'Id of the template task this agent belongs to',
     `agent_ip`            varchar(40)           DEFAULT NULL COMMENT 'Ip of the agent running the task, NULL if this is a template task',
     `uuid`                varchar(30)           DEFAULT NULL COMMENT 'Mac uuid of the agent running the task',
     `data_node_name`      varchar(128)          DEFAULT NULL COMMENT 'Node name, which links to data_node table',
@@ -576,7 +576,7 @@ CREATE TABLE IF NOT EXISTS `user`
     `encrypt_version` int(11)               DEFAULT NULL COMMENT 'Encryption key version',
     `account_type`    int(11)      NOT NULL DEFAULT '1' COMMENT 'Account type, 0-manager 1-normal',
     `due_date`        datetime              DEFAULT NULL COMMENT 'Due date for user',
-    `ext_params`      text         COMMENT 'Json extension info',
+    `ext_params`      text COMMENT 'Json extension info',
     `status`          int(11)               DEFAULT '100' COMMENT 'Status',
     `is_deleted`      int(11)               DEFAULT '0' COMMENT 'Whether to delete, 0 is not deleted, if greater than 0, delete',
     `creator`         varchar(256) NOT NULL COMMENT 'Creator name',
@@ -794,8 +794,8 @@ CREATE TABLE IF NOT EXISTS `component_heartbeat`
     `id`               int(11)     NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
     `component`        varchar(64) NOT NULL DEFAULT '' COMMENT 'Component name, such as: Agent, Sort...',
     `instance`         varchar(64) NOT NULL DEFAULT '' COMMENT 'Component instance, can be ip, name...',
-    `status_heartbeat` text        DEFAULT NULL COMMENT 'Status heartbeat info',
-    `metric_heartbeat` text        DEFAULT NULL COMMENT 'Metric heartbeat info',
+    `status_heartbeat` text                 DEFAULT NULL COMMENT 'Status heartbeat info',
+    `metric_heartbeat` text                 DEFAULT NULL COMMENT 'Metric heartbeat info',
     `report_time`      bigint(20)  NOT NULL COMMENT 'Report time',
     `create_time`      timestamp   NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
     `modify_time`      timestamp   NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
@@ -813,8 +813,8 @@ CREATE TABLE IF NOT EXISTS `group_heartbeat`
     `component`        varchar(64)  NOT NULL DEFAULT '' COMMENT 'Component name, such as: Agent, Sort...',
     `instance`         varchar(64)  NOT NULL DEFAULT '' COMMENT 'Component instance, can be ip, name...',
     `inlong_group_id`  varchar(256) NOT NULL DEFAULT '' COMMENT 'Owning inlong group id',
-    `status_heartbeat` text         DEFAULT NULL COMMENT 'Status heartbeat info',
-    `metric_heartbeat` text         DEFAULT NULL COMMENT 'Metric heartbeat info',
+    `status_heartbeat` text                  DEFAULT NULL COMMENT 'Status heartbeat info',
+    `metric_heartbeat` text                  DEFAULT NULL COMMENT 'Metric heartbeat info',
     `report_time`      bigint(20)   NOT NULL COMMENT 'Report time',
     `create_time`      timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
     `modify_time`      timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
@@ -833,8 +833,8 @@ CREATE TABLE IF NOT EXISTS `stream_heartbeat`
     `instance`         varchar(64)  NOT NULL DEFAULT '' COMMENT 'Component instance, can be ip, name...',
     `inlong_group_id`  varchar(256) NOT NULL DEFAULT '' COMMENT 'Owning inlong group id',
     `inlong_stream_id` varchar(256) NOT NULL DEFAULT '' COMMENT 'Owning inlong stream id',
-    `status_heartbeat` text         DEFAULT NULL COMMENT 'Status heartbeat info',
-    `metric_heartbeat` text         DEFAULT NULL COMMENT 'Metric heartbeat info',
+    `status_heartbeat` text                  DEFAULT NULL COMMENT 'Status heartbeat info',
+    `metric_heartbeat` text                  DEFAULT NULL COMMENT 'Metric heartbeat info',
     `report_time`      bigint(20)   NOT NULL COMMENT 'Report time',
     `create_time`      timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
     `modify_time`      timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',


[inlong] 02/04: [INLONG-5712][Sort] Fix class not found in Elasticsearch 6 connector for producting (#5717)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit edde3d9963c03ca288942860a0a3a0eda19f3b27
Author: Oneal65 <li...@foxmail.com>
AuthorDate: Mon Aug 29 14:19:09 2022 +0800

    [INLONG-5712][Sort] Fix class not found in Elasticsearch 6 connector for producting (#5717)
---
 .../sort-connectors/elasticsearch-6/pom.xml        |  2 +-
 .../sort-connectors/elasticsearch-7/pom.xml        |  1 +
 inlong-sort/sort-connectors/hbase/pom.xml          |  1 +
 inlong-sort/sort-core/pom.xml                      |  6 ++++
 .../sort/parser/Elasticsearch6SqlParseTest.java    | 32 ++++++++++++++++++++++
 .../sort/parser/Elasticsearch7SqlParseTest.java    | 31 +++++++++++++++++++++
 .../sort/parser/ElasticsearchSqlParseTest.java     | 18 +++++++-----
 pom.xml                                            |  1 +
 8 files changed, 84 insertions(+), 8 deletions(-)

diff --git a/inlong-sort/sort-connectors/elasticsearch-6/pom.xml b/inlong-sort/sort-connectors/elasticsearch-6/pom.xml
index 066fe9412..393544ee7 100644
--- a/inlong-sort/sort-connectors/elasticsearch-6/pom.xml
+++ b/inlong-sort/sort-connectors/elasticsearch-6/pom.xml
@@ -158,6 +158,7 @@
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-shade-plugin</artifactId>
+                <version>${plugin.shade.version}</version>
                 <executions>
                     <execution>
                         <id>shade-flink</id>
@@ -173,7 +174,6 @@
                                 </includes>
                                 <excludes>
                                     <!-- These dependencies are not required. -->
-                                    <exclude>com.carrotsearch:hppc</exclude>
                                     <exclude>com.tdunning:t-digest</exclude>
                                     <exclude>joda-time:joda-time</exclude>
                                     <exclude>net.sf.jopt-simple:jopt-simple</exclude>
diff --git a/inlong-sort/sort-connectors/elasticsearch-7/pom.xml b/inlong-sort/sort-connectors/elasticsearch-7/pom.xml
index 9b57c1e9f..da0d0299f 100644
--- a/inlong-sort/sort-connectors/elasticsearch-7/pom.xml
+++ b/inlong-sort/sort-connectors/elasticsearch-7/pom.xml
@@ -147,6 +147,7 @@
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-shade-plugin</artifactId>
+                <version>${plugin.shade.version}</version>
                 <executions>
                     <execution>
                         <id>shade-flink</id>
diff --git a/inlong-sort/sort-connectors/hbase/pom.xml b/inlong-sort/sort-connectors/hbase/pom.xml
index b7275ca24..17bde3827 100644
--- a/inlong-sort/sort-connectors/hbase/pom.xml
+++ b/inlong-sort/sort-connectors/hbase/pom.xml
@@ -74,6 +74,7 @@
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-shade-plugin</artifactId>
+                <version>${plugin.shade.version}</version>
                 <executions>
                     <execution>
                         <id>shade-flink</id>
diff --git a/inlong-sort/sort-core/pom.xml b/inlong-sort/sort-core/pom.xml
index 1d3951517..aabd3e7ce 100644
--- a/inlong-sort/sort-core/pom.xml
+++ b/inlong-sort/sort-core/pom.xml
@@ -112,6 +112,12 @@
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-connector-elasticsearch6</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>org.apache.inlong</groupId>
             <artifactId>sort-connector-elasticsearch7</artifactId>
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/Elasticsearch6SqlParseTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/Elasticsearch6SqlParseTest.java
new file mode 100644
index 000000000..b34a6d129
--- /dev/null
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/Elasticsearch6SqlParseTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.parser;
+
+import org.apache.inlong.sort.protocol.node.Node;
+import org.junit.Test;
+
+public class Elasticsearch6SqlParseTest extends ElasticsearchSqlParseTest {
+
+    @Test
+    public void testMysqlToElasticsearch6() throws  Exception {
+        Node outputNode = this.buildElasticsearchLoadNode(6);
+        this.testMysqlToElasticsearch(outputNode);
+    }
+
+}
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/Elasticsearch7SqlParseTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/Elasticsearch7SqlParseTest.java
new file mode 100644
index 000000000..190f9e114
--- /dev/null
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/Elasticsearch7SqlParseTest.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.parser;
+
+import org.apache.inlong.sort.protocol.node.Node;
+import org.junit.Test;
+
+public class Elasticsearch7SqlParseTest extends ElasticsearchSqlParseTest {
+
+    @Test
+    public void testMysqlToElasticsearch7() throws  Exception {
+        Node outputNode = this.buildElasticsearchLoadNode(7);
+        this.testMysqlToElasticsearch(outputNode);
+    }
+}
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/ElasticsearchSqlParseTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/ElasticsearchSqlParseTest.java
index 4ed00fb72..c7e6251e9 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/ElasticsearchSqlParseTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/ElasticsearchSqlParseTest.java
@@ -41,12 +41,11 @@ import org.apache.inlong.sort.protocol.node.load.ElasticsearchLoadNode;
 import org.apache.inlong.sort.protocol.transformation.FieldRelation;
 import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
 import org.junit.Assert;
-import org.junit.Test;
 
 /**
  * test elastic search sql parse
  */
-public class ElasticsearchSqlParseTest extends AbstractTestBase {
+public abstract class ElasticsearchSqlParseTest extends AbstractTestBase {
 
     private MySqlExtractNode buildMysqlExtractNode() {
         List<FieldInfo> fields = Arrays.asList(new FieldInfo("age", new StringFormatInfo()),
@@ -59,7 +58,13 @@ public class ElasticsearchSqlParseTest extends AbstractTestBase {
             true, null);
     }
 
-    private ElasticsearchLoadNode buildElasticsearchLoadNode() {
+    /**
+     * Build elasticsearch node
+     *
+     * @param version version number
+     * @return ElasticsearchLoadNode
+     */
+    ElasticsearchLoadNode buildElasticsearchLoadNode(int version) {
         List<FieldInfo> fields = Arrays.asList(new FieldInfo("age", new StringFormatInfo()),
             new FieldInfo("name", new StringFormatInfo()));
         List<FieldRelation> relations = Arrays
@@ -72,7 +77,7 @@ public class ElasticsearchSqlParseTest extends AbstractTestBase {
         return new ElasticsearchLoadNode("2", "kafka_output", fields, relations, null, null,
             2, null,
             "test", "http://localhost:9200",
-            "elastic", "my_password", null, "age", 7);
+            "elastic", "my_password", null, "age", version);
     }
 
     private NodeRelation buildNodeRelation(List<Node> inputs, List<Node> outputs) {
@@ -86,8 +91,7 @@ public class ElasticsearchSqlParseTest extends AbstractTestBase {
      *
      * @throws Exception The exception may throws when execute the case
      */
-    @Test
-    public void testMysqlToElasticsearch() throws Exception {
+    public void testMysqlToElasticsearch(Node node) throws Exception {
         EnvironmentSettings settings = EnvironmentSettings
             .newInstance()
             .useBlinkPlanner()
@@ -98,7 +102,7 @@ public class ElasticsearchSqlParseTest extends AbstractTestBase {
         env.enableCheckpointing(10000);
         StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
         Node inputNode = buildMysqlExtractNode();
-        Node outputNode = buildElasticsearchLoadNode();
+        Node outputNode = node;
         StreamInfo streamInfo = new StreamInfo("1", Arrays.asList(inputNode, outputNode),
             Collections.singletonList(buildNodeRelation(Collections.singletonList(inputNode),
                 Collections.singletonList(outputNode))));
diff --git a/pom.xml b/pom.xml
index b07ff3d07..35c0423f0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -111,6 +111,7 @@
         <plugin.failsafe.version>3.0.0-M7</plugin.failsafe.version>
         <plugin.shade.version>3.2.4</plugin.shade.version>
         <plugin.maven.source>3.0.1</plugin.maven.source>
+        <plugin.maven.jar.version>3.2.0</plugin.maven.jar.version>
         <exec.maven.version>1.6.0</exec.maven.version>
         <build.helper.maven.version>3.0.0</build.helper.maven.version>