You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by ab...@apache.org on 2023/05/12 00:55:56 UTC
[incubator-devlake] branch main updated: feat: collect feishu chat(group) & message (#5001)
This is an automated email from the ASF dual-hosted git repository.
abeizn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push:
new b51b3fb07 feat: collect feishu chat(group) & message (#5001)
b51b3fb07 is described below
commit b51b3fb07861dd7c94f064d7118f83a4e2f62f2d
Author: Likyh <ya...@meri.co>
AuthorDate: Fri May 12 08:55:51 2023 +0800
feat: collect feishu chat(group) & message (#5001)
* feat: collect feishu chat(group) & message
* fix: fix for ci
* fix: delete a unused code
---
backend/plugins/feishu/apimodels/im_result.go | 58 ++++++++++
backend/plugins/feishu/impl/impl.go | 6 ++
.../{tasks/task_data.go => models/chat_item.go} | 29 ++---
backend/plugins/feishu/models/message.go | 45 ++++++++
..._init_tables.go => 20230421_add_init_tables.go} | 10 +-
.../migrationscripts/archived/chat_item.go} | 29 ++---
.../models/migrationscripts/archived/message.go | 45 ++++++++
backend/plugins/feishu/tasks/api_client.go | 4 +-
backend/plugins/feishu/tasks/chat_collector.go | 92 ++++++++++++++++
.../tasks/{api_client.go => chat_extractor.go} | 45 +++++---
backend/plugins/feishu/tasks/message_collector.go | 119 +++++++++++++++++++++
backend/plugins/feishu/tasks/message_extractor.go | 87 +++++++++++++++
backend/plugins/feishu/tasks/task_data.go | 6 +-
13 files changed, 526 insertions(+), 49 deletions(-)
diff --git a/backend/plugins/feishu/apimodels/im_result.go b/backend/plugins/feishu/apimodels/im_result.go
new file mode 100644
index 000000000..2f900dbff
--- /dev/null
+++ b/backend/plugins/feishu/apimodels/im_result.go
@@ -0,0 +1,58 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package apimodels
+
+import "encoding/json"
+
+type FeishuImApiResult struct {
+ Code int `json:"code"`
+ Data struct {
+ HasMore bool `json:"has_more"`
+ Items []json.RawMessage `json:"items"`
+ PageToken string `json:"page_token"`
+ } `json:"data"`
+ Msg string `json:"msg"`
+}
+
+type FeishuMessageResultItem struct {
+ Body struct {
+ Content string `json:"content"`
+ } `json:"body"`
+ ChatId string `json:"chat_id"`
+ CreateTime string `json:"create_time"`
+ Deleted bool `json:"deleted"`
+ Mentions []struct {
+ Id string `json:"id"`
+ IdType string `json:"id_type"`
+ Key string `json:"key"`
+ Name string `json:"name"`
+ TenantKey string `json:"tenant_key"`
+ } `json:"mentions"`
+ MessageId string `json:"message_id"`
+ MsgType string `json:"msg_type"`
+ ParentId string `json:"parent_id"`
+ RootId string `json:"root_id"`
+ Sender struct {
+ Id string `json:"id"`
+ IdType string `json:"id_type"`
+ SenderType string `json:"sender_type"`
+ TenantKey string `json:"tenant_key"`
+ } `json:"sender"`
+ UpdateTime string `json:"update_time"`
+ Updated bool `json:"updated"`
+}
diff --git a/backend/plugins/feishu/impl/impl.go b/backend/plugins/feishu/impl/impl.go
index 3f6952fea..113463a5d 100644
--- a/backend/plugins/feishu/impl/impl.go
+++ b/backend/plugins/feishu/impl/impl.go
@@ -59,6 +59,12 @@ func (p Feishu) Description() string {
func (p Feishu) SubTaskMetas() []plugin.SubTaskMeta {
return []plugin.SubTaskMeta{
+ tasks.CollectChatMeta,
+ tasks.ExtractChatItemMeta,
+
+ tasks.CollectMessageMeta,
+ tasks.ExtractMessageMeta,
+
tasks.CollectMeetingTopUserItemMeta,
tasks.ExtractMeetingTopUserItemMeta,
}
diff --git a/backend/plugins/feishu/tasks/task_data.go b/backend/plugins/feishu/models/chat_item.go
similarity index 55%
copy from backend/plugins/feishu/tasks/task_data.go
copy to backend/plugins/feishu/models/chat_item.go
index 79099bc6a..8e80dd312 100644
--- a/backend/plugins/feishu/tasks/task_data.go
+++ b/backend/plugins/feishu/models/chat_item.go
@@ -15,24 +15,25 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
-package tasks
+package models
import (
- helper "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
- "github.com/apache/incubator-devlake/plugins/feishu/models"
+ "github.com/apache/incubator-devlake/core/models/common"
)
-type FeishuApiParams struct {
- ConnectionId uint64 `json:"connectionId"`
+type FeishuChatItem struct {
+ common.NoPKModel `json:"-"`
+ ConnectionId uint64 `gorm:"primaryKey"`
+ ChatId string `json:"chat_id" gorm:"primaryKey"`
+ Avatar string `json:"avatar"`
+ Description string `json:"description"`
+ External bool `json:"external"`
+ Name string `json:"name"`
+ OwnerId string `json:"owner_id"`
+ OwnerIdType string `json:"owner_id_type"`
+ TenantKey string `json:"tenant_key"`
}
-type FeishuOptions struct {
- ConnectionId uint64 `json:"connectionId"`
- NumOfDaysToCollect float64 `json:"numOfDaysToCollect"`
-}
-
-type FeishuTaskData struct {
- Options *FeishuOptions
- ApiClient *helper.ApiAsyncClient
- FeishuMeetingTopUserItem *models.FeishuMeetingTopUserItem
+func (FeishuChatItem) TableName() string {
+ return "_tool_feishu_chats"
}
diff --git a/backend/plugins/feishu/models/message.go b/backend/plugins/feishu/models/message.go
new file mode 100644
index 000000000..e2a42208f
--- /dev/null
+++ b/backend/plugins/feishu/models/message.go
@@ -0,0 +1,45 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package models
+
+import (
+ "github.com/apache/incubator-devlake/core/models/common"
+ "time"
+)
+
+type FeishuMessage struct {
+ common.NoPKModel `json:"-"`
+ ConnectionId uint64 `gorm:"primaryKey"`
+ MessageId string `json:"message_id" gorm:"primaryKey"`
+ Content string `json:"content"`
+ ChatId string `json:"chat_id"`
+ MsgType string `json:"msg_type"`
+ ParentId string `json:"parent_id"`
+ RootId string `json:"root_id"`
+ SenderId string `json:"id"`
+ SenderIdType string `json:"id_type"`
+ SenderType string `json:"sender_type"`
+ Deleted bool `json:"deleted"`
+ CreateTime time.Time `json:"create_time"`
+ UpdateTime time.Time `json:"update_time"`
+ Updated bool `json:"updated"`
+}
+
+func (FeishuMessage) TableName() string {
+ return "_tool_feishu_messages"
+}
diff --git a/backend/plugins/feishu/models/migrationscripts/20220714_add_init_tables.go b/backend/plugins/feishu/models/migrationscripts/20230421_add_init_tables.go
similarity index 89%
rename from backend/plugins/feishu/models/migrationscripts/20220714_add_init_tables.go
rename to backend/plugins/feishu/models/migrationscripts/20230421_add_init_tables.go
index e143e59f0..b111e43dc 100644
--- a/backend/plugins/feishu/models/migrationscripts/20220714_add_init_tables.go
+++ b/backend/plugins/feishu/models/migrationscripts/20230421_add_init_tables.go
@@ -34,6 +34,8 @@ func (u *addInitTables) Up(basicRes context.BasicRes) errors.Error {
err := db.DropTables(
&archived.FeishuConnection{},
&archived.FeishuMeetingTopUserItem{},
+ &archived.FeishuChatItem{},
+ &archived.FeishuMessage{},
)
if err != nil {
return err
@@ -43,6 +45,8 @@ func (u *addInitTables) Up(basicRes context.BasicRes) errors.Error {
basicRes,
&archived.FeishuConnection{},
&archived.FeishuMeetingTopUserItem{},
+ &archived.FeishuChatItem{},
+ &archived.FeishuMessage{},
)
if err != nil {
return err
@@ -55,6 +59,10 @@ func (u *addInitTables) Up(basicRes context.BasicRes) errors.Error {
connection.SecretKey = basicRes.GetConfig(`FEISHU_APPSCRECT`)
connection.Name = `Feishu`
if connection.Endpoint != `` && connection.AppId != `` && connection.SecretKey != `` && encodeKey != `` {
+ connection.SecretKey, err = plugin.Encrypt(encodeKey, connection.SecretKey)
+ if err != nil {
+ return err
+ }
// update from .env and save to db
err = db.CreateIfNotExist(connection)
if err != nil {
@@ -65,7 +73,7 @@ func (u *addInitTables) Up(basicRes context.BasicRes) errors.Error {
}
func (*addInitTables) Version() uint64 {
- return 20220714000001
+ return 20230421000001
}
func (*addInitTables) Name() string {
diff --git a/backend/plugins/feishu/tasks/task_data.go b/backend/plugins/feishu/models/migrationscripts/archived/chat_item.go
similarity index 54%
copy from backend/plugins/feishu/tasks/task_data.go
copy to backend/plugins/feishu/models/migrationscripts/archived/chat_item.go
index 79099bc6a..fc5669990 100644
--- a/backend/plugins/feishu/tasks/task_data.go
+++ b/backend/plugins/feishu/models/migrationscripts/archived/chat_item.go
@@ -15,24 +15,25 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
-package tasks
+package archived
import (
- helper "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
- "github.com/apache/incubator-devlake/plugins/feishu/models"
+ "github.com/apache/incubator-devlake/core/models/migrationscripts/archived"
)
-type FeishuApiParams struct {
- ConnectionId uint64 `json:"connectionId"`
+type FeishuChatItem struct {
+ archived.NoPKModel `json:"-"`
+ ConnectionId uint64 `gorm:"primaryKey"`
+ ChatId string `json:"chat_id" gorm:"primaryKey"`
+ Avatar string `json:"avatar"`
+ Description string `json:"description"`
+ External bool `json:"external"`
+ Name string `json:"name"`
+ OwnerId string `json:"owner_id"`
+ OwnerIdType string `json:"owner_id_type"`
+ TenantKey string `json:"tenant_key"`
}
-type FeishuOptions struct {
- ConnectionId uint64 `json:"connectionId"`
- NumOfDaysToCollect float64 `json:"numOfDaysToCollect"`
-}
-
-type FeishuTaskData struct {
- Options *FeishuOptions
- ApiClient *helper.ApiAsyncClient
- FeishuMeetingTopUserItem *models.FeishuMeetingTopUserItem
+func (FeishuChatItem) TableName() string {
+ return "_tool_feishu_chats"
}
diff --git a/backend/plugins/feishu/models/migrationscripts/archived/message.go b/backend/plugins/feishu/models/migrationscripts/archived/message.go
new file mode 100644
index 000000000..e8bebebd3
--- /dev/null
+++ b/backend/plugins/feishu/models/migrationscripts/archived/message.go
@@ -0,0 +1,45 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package archived
+
+import (
+ "github.com/apache/incubator-devlake/core/models/migrationscripts/archived"
+ "time"
+)
+
+type FeishuMessage struct {
+ archived.NoPKModel `json:"-"`
+ ConnectionId uint64 `gorm:"primaryKey"`
+ MessageId string `json:"message_id" gorm:"primaryKey"`
+ Content string `json:"content"`
+ ChatId string `json:"chat_id"`
+ MsgType string `json:"msg_type"`
+ ParentId string `json:"parent_id"`
+ RootId string `json:"root_id"`
+ SenderId string `json:"id"`
+ SenderIdType string `json:"id_type"`
+ SenderType string `json:"sender_type"`
+ Deleted bool `json:"deleted"`
+ CreateTime time.Time `json:"create_time"`
+ UpdateTime time.Time `json:"update_time"`
+ Updated bool `json:"updated"`
+}
+
+func (FeishuMessage) TableName() string {
+ return "_tool_feishu_messages"
+}
diff --git a/backend/plugins/feishu/tasks/api_client.go b/backend/plugins/feishu/tasks/api_client.go
index e015056d3..6bb3ed9fb 100644
--- a/backend/plugins/feishu/tasks/api_client.go
+++ b/backend/plugins/feishu/tasks/api_client.go
@@ -34,12 +34,12 @@ func NewFeishuApiClient(taskCtx plugin.TaskContext, connection *models.FeishuCon
}
// create async api client
- asyncApiCLient, err := api.CreateAsyncApiClient(taskCtx, apiClient, &api.ApiRateLimitCalculator{
+ asyncApiClient, err := api.CreateAsyncApiClient(taskCtx, apiClient, &api.ApiRateLimitCalculator{
UserRateLimitPerHour: connection.RateLimitPerHour,
})
if err != nil {
return nil, err
}
- return asyncApiCLient, nil
+ return asyncApiClient, nil
}
diff --git a/backend/plugins/feishu/tasks/chat_collector.go b/backend/plugins/feishu/tasks/chat_collector.go
new file mode 100644
index 000000000..f76e8c022
--- /dev/null
+++ b/backend/plugins/feishu/tasks/chat_collector.go
@@ -0,0 +1,92 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package tasks
+
+import (
+ "encoding/json"
+ "net/http"
+ "net/url"
+ "strconv"
+
+ "github.com/apache/incubator-devlake/core/errors"
+ "github.com/apache/incubator-devlake/core/plugin"
+ "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
+ "github.com/apache/incubator-devlake/plugins/feishu/apimodels"
+)
+
+const RAW_CHAT_TABLE = "feishu_chat_item"
+
+var _ plugin.SubTaskEntryPoint = CollectChat
+
+// CollectChat collect all chats that bot is in
+func CollectChat(taskCtx plugin.SubTaskContext) errors.Error {
+ data := taskCtx.GetData().(*FeishuTaskData)
+ pageSize := 50
+ collector, err := api.NewApiCollector(api.ApiCollectorArgs{
+ RawDataSubTaskArgs: api.RawDataSubTaskArgs{
+ Ctx: taskCtx,
+ Params: FeishuApiParams{
+ ConnectionId: data.Options.ConnectionId,
+ },
+ Table: RAW_CHAT_TABLE,
+ },
+ ApiClient: data.ApiClient,
+ Incremental: false,
+ UrlTemplate: "im/v1/chats",
+ PageSize: pageSize,
+ GetNextPageCustomData: func(prevReqData *api.RequestData, prevPageResponse *http.Response) (interface{}, errors.Error) {
+ res := apimodels.FeishuImApiResult{}
+ err := api.UnmarshalResponse(prevPageResponse, &res)
+ if err != nil {
+ return nil, err
+ }
+ if !res.Data.HasMore {
+ return nil, api.ErrFinishCollect
+ }
+ return res.Data.PageToken, nil
+ },
+ Query: func(reqData *api.RequestData) (url.Values, errors.Error) {
+ query := url.Values{}
+ query.Set("page_size", strconv.Itoa(pageSize))
+ if pageToken, ok := reqData.CustomData.(string); ok && pageToken != "" {
+ query.Set("page_token", reqData.CustomData.(string))
+ }
+ return query, nil
+ },
+ ResponseParser: func(res *http.Response) ([]json.RawMessage, errors.Error) {
+ body := &apimodels.FeishuImApiResult{}
+ err := api.UnmarshalResponse(res, body)
+ if err != nil {
+ return nil, err
+ }
+ return body.Data.Items, nil
+ },
+ })
+ if err != nil {
+ return err
+ }
+
+ return collector.Execute()
+}
+
+var CollectChatMeta = plugin.SubTaskMeta{
+ Name: "collectChat",
+ EntryPoint: CollectChat,
+ EnabledByDefault: true,
+ Description: "Collect chats from Feishu api",
+}
diff --git a/backend/plugins/feishu/tasks/api_client.go b/backend/plugins/feishu/tasks/chat_extractor.go
similarity index 51%
copy from backend/plugins/feishu/tasks/api_client.go
copy to backend/plugins/feishu/tasks/chat_extractor.go
index e015056d3..abad57fe5 100644
--- a/backend/plugins/feishu/tasks/api_client.go
+++ b/backend/plugins/feishu/tasks/chat_extractor.go
@@ -18,28 +18,45 @@ limitations under the License.
package tasks
import (
+ "encoding/json"
"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/plugin"
"github.com/apache/incubator-devlake/helpers/pluginhelper/api"
"github.com/apache/incubator-devlake/plugins/feishu/models"
)
-// const AUTH_ENDPOINT = "https://open.feishu.cn"
-// const ENDPOINT = "https://open.feishu.cn/open-apis/vc/v1"
-
-func NewFeishuApiClient(taskCtx plugin.TaskContext, connection *models.FeishuConnection) (*api.ApiAsyncClient, errors.Error) {
- apiClient, err := api.NewApiClientFromConnection(taskCtx.GetContext(), taskCtx, connection)
- if err != nil {
- return nil, err
- }
-
- // create async api client
- asyncApiCLient, err := api.CreateAsyncApiClient(taskCtx, apiClient, &api.ApiRateLimitCalculator{
- UserRateLimitPerHour: connection.RateLimitPerHour,
+var _ plugin.SubTaskEntryPoint = ExtractChatItem
+
+func ExtractChatItem(taskCtx plugin.SubTaskContext) errors.Error {
+ data := taskCtx.GetData().(*FeishuTaskData)
+ extractor, err := api.NewApiExtractor(api.ApiExtractorArgs{
+ RawDataSubTaskArgs: api.RawDataSubTaskArgs{
+ Ctx: taskCtx,
+ Params: FeishuApiParams{
+ ConnectionId: data.Options.ConnectionId,
+ },
+ Table: RAW_CHAT_TABLE,
+ },
+ Extract: func(row *api.RawData) ([]interface{}, errors.Error) {
+ body := &models.FeishuChatItem{}
+ err := errors.Convert(json.Unmarshal(row.Data, body))
+ if err != nil {
+ return nil, err
+ }
+ body.ConnectionId = data.Options.ConnectionId
+ return []interface{}{body}, nil
+ },
})
if err != nil {
- return nil, err
+ return err
}
- return asyncApiCLient, nil
+ return extractor.Execute()
+}
+
+var ExtractChatItemMeta = plugin.SubTaskMeta{
+ Name: "extractChatItem",
+ EntryPoint: ExtractChatItem,
+ EnabledByDefault: true,
+ Description: "Extract raw chats data into tool layer table feishu_meeting_top_user_item",
}
diff --git a/backend/plugins/feishu/tasks/message_collector.go b/backend/plugins/feishu/tasks/message_collector.go
new file mode 100644
index 000000000..3823f7f09
--- /dev/null
+++ b/backend/plugins/feishu/tasks/message_collector.go
@@ -0,0 +1,119 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package tasks
+
+import (
+ "encoding/json"
+ "github.com/apache/incubator-devlake/core/dal"
+ "github.com/apache/incubator-devlake/core/errors"
+ "github.com/apache/incubator-devlake/core/plugin"
+ "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
+ "github.com/apache/incubator-devlake/plugins/feishu/apimodels"
+ "net/http"
+ "net/url"
+ "reflect"
+ "strconv"
+)
+
+const RAW_MESSAGE_TABLE = "feishu_message"
+
+var _ plugin.SubTaskEntryPoint = CollectMessage
+
+type ChatInput struct {
+ ChatId string `json:"chat_id"`
+}
+
+func CollectMessage(taskCtx plugin.SubTaskContext) errors.Error {
+ data := taskCtx.GetData().(*FeishuTaskData)
+ db := taskCtx.GetDal()
+
+ clauses := []dal.Clause{
+ dal.Select("chat_id AS chat_id"),
+ dal.From("_tool_feishu_chats"),
+ dal.Where("connection_id=?", data.Options.ConnectionId),
+ }
+
+ // construct the input iterator
+ cursor, err := db.Cursor(clauses...)
+ if err != nil {
+ return err
+ }
+ // smaller struct can reduce memory footprint, we should try to avoid using big struct
+ iterator, err := api.NewDalCursorIterator(db, cursor, reflect.TypeOf(ChatInput{}))
+ if err != nil {
+ return err
+ }
+
+ pageSize := 50
+ collector, err := api.NewApiCollector(api.ApiCollectorArgs{
+ RawDataSubTaskArgs: api.RawDataSubTaskArgs{
+ Ctx: taskCtx,
+ Params: FeishuApiParams{
+ ConnectionId: data.Options.ConnectionId,
+ },
+ Table: RAW_MESSAGE_TABLE,
+ },
+ ApiClient: data.ApiClient,
+ Incremental: false,
+ Input: iterator,
+ UrlTemplate: "im/v1/messages",
+ PageSize: pageSize,
+ GetNextPageCustomData: func(prevReqData *api.RequestData, prevPageResponse *http.Response) (interface{}, errors.Error) {
+ res := apimodels.FeishuImApiResult{}
+ err := api.UnmarshalResponse(prevPageResponse, &res)
+ if err != nil {
+ return nil, err
+ }
+ if !res.Data.HasMore {
+ return nil, api.ErrFinishCollect
+ }
+ return res.Data.PageToken, nil
+ },
+ Query: func(reqData *api.RequestData) (url.Values, errors.Error) {
+ input := reqData.Input.(*ChatInput)
+ query := url.Values{}
+ query.Set("container_id_type", "chat")
+ query.Set("container_id", input.ChatId)
+ query.Set("page_size", strconv.Itoa(pageSize))
+ if pageToken, ok := reqData.CustomData.(string); ok && pageToken != "" {
+ query.Set("page_token", reqData.CustomData.(string))
+ }
+ return query, nil
+ },
+ ResponseParser: func(res *http.Response) ([]json.RawMessage, errors.Error) {
+ body := &apimodels.FeishuImApiResult{}
+ err := api.UnmarshalResponse(res, body)
+ if err != nil {
+ return nil, err
+ }
+ return body.Data.Items, nil
+ },
+ })
+ if err != nil {
+ return err
+ }
+
+ return collector.Execute()
+}
+
+var CollectMessageMeta = plugin.SubTaskMeta{
+ Name: "collectMeesage",
+ EntryPoint: CollectMessage,
+ EnabledByDefault: true,
+ Description: "Collect message from Feishu api",
+}
diff --git a/backend/plugins/feishu/tasks/message_extractor.go b/backend/plugins/feishu/tasks/message_extractor.go
new file mode 100644
index 000000000..f075b61f1
--- /dev/null
+++ b/backend/plugins/feishu/tasks/message_extractor.go
@@ -0,0 +1,87 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package tasks
+
+import (
+ "encoding/json"
+ "github.com/apache/incubator-devlake/core/errors"
+ "github.com/apache/incubator-devlake/core/plugin"
+ "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
+ "github.com/apache/incubator-devlake/plugins/feishu/apimodels"
+ "github.com/apache/incubator-devlake/plugins/feishu/models"
+ "strconv"
+ "time"
+)
+
+var _ plugin.SubTaskEntryPoint = ExtractMessage
+
+func ExtractMessage(taskCtx plugin.SubTaskContext) errors.Error {
+ data := taskCtx.GetData().(*FeishuTaskData)
+ extractor, err := api.NewApiExtractor(api.ApiExtractorArgs{
+ RawDataSubTaskArgs: api.RawDataSubTaskArgs{
+ Ctx: taskCtx,
+ Params: FeishuApiParams{
+ ConnectionId: data.Options.ConnectionId,
+ },
+ Table: RAW_MESSAGE_TABLE,
+ },
+ Extract: func(row *api.RawData) ([]interface{}, errors.Error) {
+ body := &apimodels.FeishuMessageResultItem{}
+ err := errors.Convert(json.Unmarshal(row.Data, body))
+ if err != nil {
+ return nil, err
+ }
+ message := &models.FeishuMessage{}
+ message.ConnectionId = data.Options.ConnectionId
+ message.MessageId = body.MessageId
+ message.Content = body.Body.Content
+ message.ChatId = body.ChatId
+ message.MsgType = body.MsgType
+ message.ParentId = body.ParentId
+ message.RootId = body.RootId
+ message.SenderId = body.Sender.Id
+ message.SenderIdType = body.Sender.IdType
+ message.SenderType = body.Sender.SenderType
+ message.Deleted = body.Deleted
+ createTimestamp, err := errors.Convert01(strconv.Atoi(body.CreateTime))
+ if err != nil {
+ return nil, err
+ }
+ message.CreateTime = time.UnixMilli(int64(createTimestamp))
+ updateTimestamp, err := errors.Convert01(strconv.Atoi(body.UpdateTime))
+ if err != nil {
+ return nil, err
+ }
+ message.UpdateTime = time.UnixMilli(int64(updateTimestamp))
+ message.Updated = body.Updated
+ return []interface{}{message}, nil
+ },
+ })
+ if err != nil {
+ return err
+ }
+
+ return extractor.Execute()
+}
+
+var ExtractMessageMeta = plugin.SubTaskMeta{
+ Name: "extractChatItem",
+ EntryPoint: ExtractMessage,
+ EnabledByDefault: true,
+ Description: "Extract raw messages data into tool layer table feishu_meeting_top_user_item",
+}
diff --git a/backend/plugins/feishu/tasks/task_data.go b/backend/plugins/feishu/tasks/task_data.go
index 79099bc6a..428c06739 100644
--- a/backend/plugins/feishu/tasks/task_data.go
+++ b/backend/plugins/feishu/tasks/task_data.go
@@ -19,7 +19,6 @@ package tasks
import (
helper "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
- "github.com/apache/incubator-devlake/plugins/feishu/models"
)
type FeishuApiParams struct {
@@ -32,7 +31,6 @@ type FeishuOptions struct {
}
type FeishuTaskData struct {
- Options *FeishuOptions
- ApiClient *helper.ApiAsyncClient
- FeishuMeetingTopUserItem *models.FeishuMeetingTopUserItem
+ Options *FeishuOptions
+ ApiClient *helper.ApiAsyncClient
}