You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by ab...@apache.org on 2023/05/12 00:55:56 UTC

[incubator-devlake] branch main updated: feat: collect feishu chat(group) & message (#5001)

This is an automated email from the ASF dual-hosted git repository.

abeizn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new b51b3fb07 feat: collect feishu chat(group) & message (#5001)
b51b3fb07 is described below

commit b51b3fb07861dd7c94f064d7118f83a4e2f62f2d
Author: Likyh <ya...@meri.co>
AuthorDate: Fri May 12 08:55:51 2023 +0800

    feat: collect feishu chat(group) & message (#5001)
    
    * feat: collect feishu chat(group) & message
    
    * fix: fix for ci
    
    * fix: delete a unused code
---
 backend/plugins/feishu/apimodels/im_result.go      |  58 ++++++++++
 backend/plugins/feishu/impl/impl.go                |   6 ++
 .../{tasks/task_data.go => models/chat_item.go}    |  29 ++---
 backend/plugins/feishu/models/message.go           |  45 ++++++++
 ..._init_tables.go => 20230421_add_init_tables.go} |  10 +-
 .../migrationscripts/archived/chat_item.go}        |  29 ++---
 .../models/migrationscripts/archived/message.go    |  45 ++++++++
 backend/plugins/feishu/tasks/api_client.go         |   4 +-
 backend/plugins/feishu/tasks/chat_collector.go     |  92 ++++++++++++++++
 .../tasks/{api_client.go => chat_extractor.go}     |  45 +++++---
 backend/plugins/feishu/tasks/message_collector.go  | 119 +++++++++++++++++++++
 backend/plugins/feishu/tasks/message_extractor.go  |  87 +++++++++++++++
 backend/plugins/feishu/tasks/task_data.go          |   6 +-
 13 files changed, 526 insertions(+), 49 deletions(-)

diff --git a/backend/plugins/feishu/apimodels/im_result.go b/backend/plugins/feishu/apimodels/im_result.go
new file mode 100644
index 000000000..2f900dbff
--- /dev/null
+++ b/backend/plugins/feishu/apimodels/im_result.go
@@ -0,0 +1,58 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package apimodels
+
+import "encoding/json"
+
+type FeishuImApiResult struct {
+	Code int `json:"code"`
+	Data struct {
+		HasMore   bool              `json:"has_more"`
+		Items     []json.RawMessage `json:"items"`
+		PageToken string            `json:"page_token"`
+	} `json:"data"`
+	Msg string `json:"msg"`
+}
+
+type FeishuMessageResultItem struct {
+	Body struct {
+		Content string `json:"content"`
+	} `json:"body"`
+	ChatId     string `json:"chat_id"`
+	CreateTime string `json:"create_time"`
+	Deleted    bool   `json:"deleted"`
+	Mentions   []struct {
+		Id        string `json:"id"`
+		IdType    string `json:"id_type"`
+		Key       string `json:"key"`
+		Name      string `json:"name"`
+		TenantKey string `json:"tenant_key"`
+	} `json:"mentions"`
+	MessageId string `json:"message_id"`
+	MsgType   string `json:"msg_type"`
+	ParentId  string `json:"parent_id"`
+	RootId    string `json:"root_id"`
+	Sender    struct {
+		Id         string `json:"id"`
+		IdType     string `json:"id_type"`
+		SenderType string `json:"sender_type"`
+		TenantKey  string `json:"tenant_key"`
+	} `json:"sender"`
+	UpdateTime string `json:"update_time"`
+	Updated    bool   `json:"updated"`
+}
diff --git a/backend/plugins/feishu/impl/impl.go b/backend/plugins/feishu/impl/impl.go
index 3f6952fea..113463a5d 100644
--- a/backend/plugins/feishu/impl/impl.go
+++ b/backend/plugins/feishu/impl/impl.go
@@ -59,6 +59,12 @@ func (p Feishu) Description() string {
 
 func (p Feishu) SubTaskMetas() []plugin.SubTaskMeta {
 	return []plugin.SubTaskMeta{
+		tasks.CollectChatMeta,
+		tasks.ExtractChatItemMeta,
+
+		tasks.CollectMessageMeta,
+		tasks.ExtractMessageMeta,
+
 		tasks.CollectMeetingTopUserItemMeta,
 		tasks.ExtractMeetingTopUserItemMeta,
 	}
diff --git a/backend/plugins/feishu/tasks/task_data.go b/backend/plugins/feishu/models/chat_item.go
similarity index 55%
copy from backend/plugins/feishu/tasks/task_data.go
copy to backend/plugins/feishu/models/chat_item.go
index 79099bc6a..8e80dd312 100644
--- a/backend/plugins/feishu/tasks/task_data.go
+++ b/backend/plugins/feishu/models/chat_item.go
@@ -15,24 +15,25 @@ See the License for the specific language governing permissions and
 limitations under the License.
 */
 
-package tasks
+package models
 
 import (
-	helper "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
-	"github.com/apache/incubator-devlake/plugins/feishu/models"
+	"github.com/apache/incubator-devlake/core/models/common"
 )
 
-type FeishuApiParams struct {
-	ConnectionId uint64 `json:"connectionId"`
+type FeishuChatItem struct {
+	common.NoPKModel `json:"-"`
+	ConnectionId     uint64 `gorm:"primaryKey"`
+	ChatId           string `json:"chat_id" gorm:"primaryKey"`
+	Avatar           string `json:"avatar"`
+	Description      string `json:"description"`
+	External         bool   `json:"external"`
+	Name             string `json:"name"`
+	OwnerId          string `json:"owner_id"`
+	OwnerIdType      string `json:"owner_id_type"`
+	TenantKey        string `json:"tenant_key"`
 }
 
-type FeishuOptions struct {
-	ConnectionId       uint64  `json:"connectionId"`
-	NumOfDaysToCollect float64 `json:"numOfDaysToCollect"`
-}
-
-type FeishuTaskData struct {
-	Options                  *FeishuOptions
-	ApiClient                *helper.ApiAsyncClient
-	FeishuMeetingTopUserItem *models.FeishuMeetingTopUserItem
+func (FeishuChatItem) TableName() string {
+	return "_tool_feishu_chats"
 }
diff --git a/backend/plugins/feishu/models/message.go b/backend/plugins/feishu/models/message.go
new file mode 100644
index 000000000..e2a42208f
--- /dev/null
+++ b/backend/plugins/feishu/models/message.go
@@ -0,0 +1,45 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package models
+
+import (
+	"github.com/apache/incubator-devlake/core/models/common"
+	"time"
+)
+
+type FeishuMessage struct {
+	common.NoPKModel `json:"-"`
+	ConnectionId     uint64    `gorm:"primaryKey"`
+	MessageId        string    `json:"message_id" gorm:"primaryKey"`
+	Content          string    `json:"content"`
+	ChatId           string    `json:"chat_id"`
+	MsgType          string    `json:"msg_type"`
+	ParentId         string    `json:"parent_id"`
+	RootId           string    `json:"root_id"`
+	SenderId         string    `json:"id"`
+	SenderIdType     string    `json:"id_type"`
+	SenderType       string    `json:"sender_type"`
+	Deleted          bool      `json:"deleted"`
+	CreateTime       time.Time `json:"create_time"`
+	UpdateTime       time.Time `json:"update_time"`
+	Updated          bool      `json:"updated"`
+}
+
+func (FeishuMessage) TableName() string {
+	return "_tool_feishu_messages"
+}
diff --git a/backend/plugins/feishu/models/migrationscripts/20220714_add_init_tables.go b/backend/plugins/feishu/models/migrationscripts/20230421_add_init_tables.go
similarity index 89%
rename from backend/plugins/feishu/models/migrationscripts/20220714_add_init_tables.go
rename to backend/plugins/feishu/models/migrationscripts/20230421_add_init_tables.go
index e143e59f0..b111e43dc 100644
--- a/backend/plugins/feishu/models/migrationscripts/20220714_add_init_tables.go
+++ b/backend/plugins/feishu/models/migrationscripts/20230421_add_init_tables.go
@@ -34,6 +34,8 @@ func (u *addInitTables) Up(basicRes context.BasicRes) errors.Error {
 	err := db.DropTables(
 		&archived.FeishuConnection{},
 		&archived.FeishuMeetingTopUserItem{},
+		&archived.FeishuChatItem{},
+		&archived.FeishuMessage{},
 	)
 	if err != nil {
 		return err
@@ -43,6 +45,8 @@ func (u *addInitTables) Up(basicRes context.BasicRes) errors.Error {
 		basicRes,
 		&archived.FeishuConnection{},
 		&archived.FeishuMeetingTopUserItem{},
+		&archived.FeishuChatItem{},
+		&archived.FeishuMessage{},
 	)
 	if err != nil {
 		return err
@@ -55,6 +59,10 @@ func (u *addInitTables) Up(basicRes context.BasicRes) errors.Error {
 	connection.SecretKey = basicRes.GetConfig(`FEISHU_APPSCRECT`)
 	connection.Name = `Feishu`
 	if connection.Endpoint != `` && connection.AppId != `` && connection.SecretKey != `` && encodeKey != `` {
+		connection.SecretKey, err = plugin.Encrypt(encodeKey, connection.SecretKey)
+		if err != nil {
+			return err
+		}
 		// update from .env and save to db
 		err = db.CreateIfNotExist(connection)
 		if err != nil {
@@ -65,7 +73,7 @@ func (u *addInitTables) Up(basicRes context.BasicRes) errors.Error {
 }
 
 func (*addInitTables) Version() uint64 {
-	return 20220714000001
+	return 20230421000001
 }
 
 func (*addInitTables) Name() string {
diff --git a/backend/plugins/feishu/tasks/task_data.go b/backend/plugins/feishu/models/migrationscripts/archived/chat_item.go
similarity index 54%
copy from backend/plugins/feishu/tasks/task_data.go
copy to backend/plugins/feishu/models/migrationscripts/archived/chat_item.go
index 79099bc6a..fc5669990 100644
--- a/backend/plugins/feishu/tasks/task_data.go
+++ b/backend/plugins/feishu/models/migrationscripts/archived/chat_item.go
@@ -15,24 +15,25 @@ See the License for the specific language governing permissions and
 limitations under the License.
 */
 
-package tasks
+package archived
 
 import (
-	helper "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
-	"github.com/apache/incubator-devlake/plugins/feishu/models"
+	"github.com/apache/incubator-devlake/core/models/migrationscripts/archived"
 )
 
-type FeishuApiParams struct {
-	ConnectionId uint64 `json:"connectionId"`
+type FeishuChatItem struct {
+	archived.NoPKModel `json:"-"`
+	ConnectionId       uint64 `gorm:"primaryKey"`
+	ChatId             string `json:"chat_id" gorm:"primaryKey"`
+	Avatar             string `json:"avatar"`
+	Description        string `json:"description"`
+	External           bool   `json:"external"`
+	Name               string `json:"name"`
+	OwnerId            string `json:"owner_id"`
+	OwnerIdType        string `json:"owner_id_type"`
+	TenantKey          string `json:"tenant_key"`
 }
 
-type FeishuOptions struct {
-	ConnectionId       uint64  `json:"connectionId"`
-	NumOfDaysToCollect float64 `json:"numOfDaysToCollect"`
-}
-
-type FeishuTaskData struct {
-	Options                  *FeishuOptions
-	ApiClient                *helper.ApiAsyncClient
-	FeishuMeetingTopUserItem *models.FeishuMeetingTopUserItem
+func (FeishuChatItem) TableName() string {
+	return "_tool_feishu_chats"
 }
diff --git a/backend/plugins/feishu/models/migrationscripts/archived/message.go b/backend/plugins/feishu/models/migrationscripts/archived/message.go
new file mode 100644
index 000000000..e8bebebd3
--- /dev/null
+++ b/backend/plugins/feishu/models/migrationscripts/archived/message.go
@@ -0,0 +1,45 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package archived
+
+import (
+	"github.com/apache/incubator-devlake/core/models/migrationscripts/archived"
+	"time"
+)
+
+type FeishuMessage struct {
+	archived.NoPKModel `json:"-"`
+	ConnectionId       uint64    `gorm:"primaryKey"`
+	MessageId          string    `json:"message_id" gorm:"primaryKey"`
+	Content            string    `json:"content"`
+	ChatId             string    `json:"chat_id"`
+	MsgType            string    `json:"msg_type"`
+	ParentId           string    `json:"parent_id"`
+	RootId             string    `json:"root_id"`
+	SenderId           string    `json:"id"`
+	SenderIdType       string    `json:"id_type"`
+	SenderType         string    `json:"sender_type"`
+	Deleted            bool      `json:"deleted"`
+	CreateTime         time.Time `json:"create_time"`
+	UpdateTime         time.Time `json:"update_time"`
+	Updated            bool      `json:"updated"`
+}
+
+func (FeishuMessage) TableName() string {
+	return "_tool_feishu_messages"
+}
diff --git a/backend/plugins/feishu/tasks/api_client.go b/backend/plugins/feishu/tasks/api_client.go
index e015056d3..6bb3ed9fb 100644
--- a/backend/plugins/feishu/tasks/api_client.go
+++ b/backend/plugins/feishu/tasks/api_client.go
@@ -34,12 +34,12 @@ func NewFeishuApiClient(taskCtx plugin.TaskContext, connection *models.FeishuCon
 	}
 
 	// create async api client
-	asyncApiCLient, err := api.CreateAsyncApiClient(taskCtx, apiClient, &api.ApiRateLimitCalculator{
+	asyncApiClient, err := api.CreateAsyncApiClient(taskCtx, apiClient, &api.ApiRateLimitCalculator{
 		UserRateLimitPerHour: connection.RateLimitPerHour,
 	})
 	if err != nil {
 		return nil, err
 	}
 
-	return asyncApiCLient, nil
+	return asyncApiClient, nil
 }
diff --git a/backend/plugins/feishu/tasks/chat_collector.go b/backend/plugins/feishu/tasks/chat_collector.go
new file mode 100644
index 000000000..f76e8c022
--- /dev/null
+++ b/backend/plugins/feishu/tasks/chat_collector.go
@@ -0,0 +1,92 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package tasks
+
+import (
+	"encoding/json"
+	"net/http"
+	"net/url"
+	"strconv"
+
+	"github.com/apache/incubator-devlake/core/errors"
+	"github.com/apache/incubator-devlake/core/plugin"
+	"github.com/apache/incubator-devlake/helpers/pluginhelper/api"
+	"github.com/apache/incubator-devlake/plugins/feishu/apimodels"
+)
+
+const RAW_CHAT_TABLE = "feishu_chat_item"
+
+var _ plugin.SubTaskEntryPoint = CollectChat
+
+// CollectChat collect all chats that bot is in
+func CollectChat(taskCtx plugin.SubTaskContext) errors.Error {
+	data := taskCtx.GetData().(*FeishuTaskData)
+	pageSize := 50
+	collector, err := api.NewApiCollector(api.ApiCollectorArgs{
+		RawDataSubTaskArgs: api.RawDataSubTaskArgs{
+			Ctx: taskCtx,
+			Params: FeishuApiParams{
+				ConnectionId: data.Options.ConnectionId,
+			},
+			Table: RAW_CHAT_TABLE,
+		},
+		ApiClient:   data.ApiClient,
+		Incremental: false,
+		UrlTemplate: "im/v1/chats",
+		PageSize:    pageSize,
+		GetNextPageCustomData: func(prevReqData *api.RequestData, prevPageResponse *http.Response) (interface{}, errors.Error) {
+			res := apimodels.FeishuImApiResult{}
+			err := api.UnmarshalResponse(prevPageResponse, &res)
+			if err != nil {
+				return nil, err
+			}
+			if !res.Data.HasMore {
+				return nil, api.ErrFinishCollect
+			}
+			return res.Data.PageToken, nil
+		},
+		Query: func(reqData *api.RequestData) (url.Values, errors.Error) {
+			query := url.Values{}
+			query.Set("page_size", strconv.Itoa(pageSize))
+			if pageToken, ok := reqData.CustomData.(string); ok && pageToken != "" {
+				query.Set("page_token", reqData.CustomData.(string))
+			}
+			return query, nil
+		},
+		ResponseParser: func(res *http.Response) ([]json.RawMessage, errors.Error) {
+			body := &apimodels.FeishuImApiResult{}
+			err := api.UnmarshalResponse(res, body)
+			if err != nil {
+				return nil, err
+			}
+			return body.Data.Items, nil
+		},
+	})
+	if err != nil {
+		return err
+	}
+
+	return collector.Execute()
+}
+
+var CollectChatMeta = plugin.SubTaskMeta{
+	Name:             "collectChat",
+	EntryPoint:       CollectChat,
+	EnabledByDefault: true,
+	Description:      "Collect chats from Feishu api",
+}
diff --git a/backend/plugins/feishu/tasks/api_client.go b/backend/plugins/feishu/tasks/chat_extractor.go
similarity index 51%
copy from backend/plugins/feishu/tasks/api_client.go
copy to backend/plugins/feishu/tasks/chat_extractor.go
index e015056d3..abad57fe5 100644
--- a/backend/plugins/feishu/tasks/api_client.go
+++ b/backend/plugins/feishu/tasks/chat_extractor.go
@@ -18,28 +18,45 @@ limitations under the License.
 package tasks
 
 import (
+	"encoding/json"
 	"github.com/apache/incubator-devlake/core/errors"
 	"github.com/apache/incubator-devlake/core/plugin"
 	"github.com/apache/incubator-devlake/helpers/pluginhelper/api"
 	"github.com/apache/incubator-devlake/plugins/feishu/models"
 )
 
-// const AUTH_ENDPOINT = "https://open.feishu.cn"
-// const ENDPOINT = "https://open.feishu.cn/open-apis/vc/v1"
-
-func NewFeishuApiClient(taskCtx plugin.TaskContext, connection *models.FeishuConnection) (*api.ApiAsyncClient, errors.Error) {
-	apiClient, err := api.NewApiClientFromConnection(taskCtx.GetContext(), taskCtx, connection)
-	if err != nil {
-		return nil, err
-	}
-
-	// create async api client
-	asyncApiCLient, err := api.CreateAsyncApiClient(taskCtx, apiClient, &api.ApiRateLimitCalculator{
-		UserRateLimitPerHour: connection.RateLimitPerHour,
+var _ plugin.SubTaskEntryPoint = ExtractChatItem
+
+func ExtractChatItem(taskCtx plugin.SubTaskContext) errors.Error {
+	data := taskCtx.GetData().(*FeishuTaskData)
+	extractor, err := api.NewApiExtractor(api.ApiExtractorArgs{
+		RawDataSubTaskArgs: api.RawDataSubTaskArgs{
+			Ctx: taskCtx,
+			Params: FeishuApiParams{
+				ConnectionId: data.Options.ConnectionId,
+			},
+			Table: RAW_CHAT_TABLE,
+		},
+		Extract: func(row *api.RawData) ([]interface{}, errors.Error) {
+			body := &models.FeishuChatItem{}
+			err := errors.Convert(json.Unmarshal(row.Data, body))
+			if err != nil {
+				return nil, err
+			}
+			body.ConnectionId = data.Options.ConnectionId
+			return []interface{}{body}, nil
+		},
 	})
 	if err != nil {
-		return nil, err
+		return err
 	}
 
-	return asyncApiCLient, nil
+	return extractor.Execute()
+}
+
+var ExtractChatItemMeta = plugin.SubTaskMeta{
+	Name:             "extractChatItem",
+	EntryPoint:       ExtractChatItem,
+	EnabledByDefault: true,
+	Description:      "Extract raw chats data into tool layer table feishu_meeting_top_user_item",
 }
diff --git a/backend/plugins/feishu/tasks/message_collector.go b/backend/plugins/feishu/tasks/message_collector.go
new file mode 100644
index 000000000..3823f7f09
--- /dev/null
+++ b/backend/plugins/feishu/tasks/message_collector.go
@@ -0,0 +1,119 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package tasks
+
+import (
+	"encoding/json"
+	"github.com/apache/incubator-devlake/core/dal"
+	"github.com/apache/incubator-devlake/core/errors"
+	"github.com/apache/incubator-devlake/core/plugin"
+	"github.com/apache/incubator-devlake/helpers/pluginhelper/api"
+	"github.com/apache/incubator-devlake/plugins/feishu/apimodels"
+	"net/http"
+	"net/url"
+	"reflect"
+	"strconv"
+)
+
+const RAW_MESSAGE_TABLE = "feishu_message"
+
+var _ plugin.SubTaskEntryPoint = CollectMessage
+
+type ChatInput struct {
+	ChatId string `json:"chat_id"`
+}
+
+func CollectMessage(taskCtx plugin.SubTaskContext) errors.Error {
+	data := taskCtx.GetData().(*FeishuTaskData)
+	db := taskCtx.GetDal()
+
+	clauses := []dal.Clause{
+		dal.Select("chat_id AS chat_id"),
+		dal.From("_tool_feishu_chats"),
+		dal.Where("connection_id=?", data.Options.ConnectionId),
+	}
+
+	// construct the input iterator
+	cursor, err := db.Cursor(clauses...)
+	if err != nil {
+		return err
+	}
+	// smaller struct can reduce memory footprint, we should try to avoid using big struct
+	iterator, err := api.NewDalCursorIterator(db, cursor, reflect.TypeOf(ChatInput{}))
+	if err != nil {
+		return err
+	}
+
+	pageSize := 50
+	collector, err := api.NewApiCollector(api.ApiCollectorArgs{
+		RawDataSubTaskArgs: api.RawDataSubTaskArgs{
+			Ctx: taskCtx,
+			Params: FeishuApiParams{
+				ConnectionId: data.Options.ConnectionId,
+			},
+			Table: RAW_MESSAGE_TABLE,
+		},
+		ApiClient:   data.ApiClient,
+		Incremental: false,
+		Input:       iterator,
+		UrlTemplate: "im/v1/messages",
+		PageSize:    pageSize,
+		GetNextPageCustomData: func(prevReqData *api.RequestData, prevPageResponse *http.Response) (interface{}, errors.Error) {
+			res := apimodels.FeishuImApiResult{}
+			err := api.UnmarshalResponse(prevPageResponse, &res)
+			if err != nil {
+				return nil, err
+			}
+			if !res.Data.HasMore {
+				return nil, api.ErrFinishCollect
+			}
+			return res.Data.PageToken, nil
+		},
+		Query: func(reqData *api.RequestData) (url.Values, errors.Error) {
+			input := reqData.Input.(*ChatInput)
+			query := url.Values{}
+			query.Set("container_id_type", "chat")
+			query.Set("container_id", input.ChatId)
+			query.Set("page_size", strconv.Itoa(pageSize))
+			if pageToken, ok := reqData.CustomData.(string); ok && pageToken != "" {
+				query.Set("page_token", reqData.CustomData.(string))
+			}
+			return query, nil
+		},
+		ResponseParser: func(res *http.Response) ([]json.RawMessage, errors.Error) {
+			body := &apimodels.FeishuImApiResult{}
+			err := api.UnmarshalResponse(res, body)
+			if err != nil {
+				return nil, err
+			}
+			return body.Data.Items, nil
+		},
+	})
+	if err != nil {
+		return err
+	}
+
+	return collector.Execute()
+}
+
+var CollectMessageMeta = plugin.SubTaskMeta{
+	Name:             "collectMeesage",
+	EntryPoint:       CollectMessage,
+	EnabledByDefault: true,
+	Description:      "Collect message from Feishu api",
+}
diff --git a/backend/plugins/feishu/tasks/message_extractor.go b/backend/plugins/feishu/tasks/message_extractor.go
new file mode 100644
index 000000000..f075b61f1
--- /dev/null
+++ b/backend/plugins/feishu/tasks/message_extractor.go
@@ -0,0 +1,87 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package tasks
+
+import (
+	"encoding/json"
+	"github.com/apache/incubator-devlake/core/errors"
+	"github.com/apache/incubator-devlake/core/plugin"
+	"github.com/apache/incubator-devlake/helpers/pluginhelper/api"
+	"github.com/apache/incubator-devlake/plugins/feishu/apimodels"
+	"github.com/apache/incubator-devlake/plugins/feishu/models"
+	"strconv"
+	"time"
+)
+
+var _ plugin.SubTaskEntryPoint = ExtractMessage
+
+func ExtractMessage(taskCtx plugin.SubTaskContext) errors.Error {
+	data := taskCtx.GetData().(*FeishuTaskData)
+	extractor, err := api.NewApiExtractor(api.ApiExtractorArgs{
+		RawDataSubTaskArgs: api.RawDataSubTaskArgs{
+			Ctx: taskCtx,
+			Params: FeishuApiParams{
+				ConnectionId: data.Options.ConnectionId,
+			},
+			Table: RAW_MESSAGE_TABLE,
+		},
+		Extract: func(row *api.RawData) ([]interface{}, errors.Error) {
+			body := &apimodels.FeishuMessageResultItem{}
+			err := errors.Convert(json.Unmarshal(row.Data, body))
+			if err != nil {
+				return nil, err
+			}
+			message := &models.FeishuMessage{}
+			message.ConnectionId = data.Options.ConnectionId
+			message.MessageId = body.MessageId
+			message.Content = body.Body.Content
+			message.ChatId = body.ChatId
+			message.MsgType = body.MsgType
+			message.ParentId = body.ParentId
+			message.RootId = body.RootId
+			message.SenderId = body.Sender.Id
+			message.SenderIdType = body.Sender.IdType
+			message.SenderType = body.Sender.SenderType
+			message.Deleted = body.Deleted
+			createTimestamp, err := errors.Convert01(strconv.Atoi(body.CreateTime))
+			if err != nil {
+				return nil, err
+			}
+			message.CreateTime = time.UnixMilli(int64(createTimestamp))
+			updateTimestamp, err := errors.Convert01(strconv.Atoi(body.UpdateTime))
+			if err != nil {
+				return nil, err
+			}
+			message.UpdateTime = time.UnixMilli(int64(updateTimestamp))
+			message.Updated = body.Updated
+			return []interface{}{message}, nil
+		},
+	})
+	if err != nil {
+		return err
+	}
+
+	return extractor.Execute()
+}
+
+var ExtractMessageMeta = plugin.SubTaskMeta{
+	Name:             "extractChatItem",
+	EntryPoint:       ExtractMessage,
+	EnabledByDefault: true,
+	Description:      "Extract raw messages data into tool layer table feishu_meeting_top_user_item",
+}
diff --git a/backend/plugins/feishu/tasks/task_data.go b/backend/plugins/feishu/tasks/task_data.go
index 79099bc6a..428c06739 100644
--- a/backend/plugins/feishu/tasks/task_data.go
+++ b/backend/plugins/feishu/tasks/task_data.go
@@ -19,7 +19,6 @@ package tasks
 
 import (
 	helper "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
-	"github.com/apache/incubator-devlake/plugins/feishu/models"
 )
 
 type FeishuApiParams struct {
@@ -32,7 +31,6 @@ type FeishuOptions struct {
 }
 
 type FeishuTaskData struct {
-	Options                  *FeishuOptions
-	ApiClient                *helper.ApiAsyncClient
-	FeishuMeetingTopUserItem *models.FeishuMeetingTopUserItem
+	Options   *FeishuOptions
+	ApiClient *helper.ApiAsyncClient
 }