You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by kl...@apache.org on 2022/11/04 16:26:42 UTC

[incubator-devlake] branch main updated: [issue-3123][backend] Singer-spec framework support (#3151)

This is an automated email from the ASF dual-hosted git repository.

klesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new fc93e331 [issue-3123][backend] Singer-spec framework support (#3151)
fc93e331 is described below

commit fc93e331613f8b5fd4ff54c58aaf5ca92f7b7d2f
Author: Keon Amini <ke...@merico.dev>
AuthorDate: Fri Nov 4 11:26:38 2022 -0500

    [issue-3123][backend] Singer-spec framework support (#3151)
    
    * feat: Singer-spec plugin framework implementation at the extractor layer
    
    * refactor: Singer-spec plugin framework reworked and adapted to the collector layer
    
    * refactor: Simplified the setup logic of the framework
    
    * fix: add license prependment to generated src files
    
    * fix: use empty json for default input
    
    * fix: fixed edge case when state is not the last message seen by the tap
    
    * refactor: converted raw task fields back to private again
    
    * refactor: generified the tap framework code to be less coupled with singer
---
 .env.example                                       |   3 +
 errors/types.go                                    |   3 +
 errors/util.go                                     |  30 +++
 go.mod                                             |   4 +-
 go.sum                                             |  42 +---
 helpers/pluginhelper/tap/models.go                 |  89 ++++++++
 helpers/pluginhelper/tap/singer_models.go          |  45 +++++
 helpers/pluginhelper/tap/singer_tap_client.go      |  47 +++++
 helpers/pluginhelper/tap/singer_tap_impl.go        | 224 +++++++++++++++++++++
 helpers/pluginhelper/tap/singer_tap_output.go      |  73 +++++++
 helpers/pluginhelper/tap/tap.go                    |  39 ++++
 helpers/pluginhelper/tap/tap_collector.go          | 217 ++++++++++++++++++++
 .../20221101_add_collector_state.go                |  49 +++++
 models/migrationscripts/archived/base.go           |   7 +
 models/migrationscripts/register.go                |   1 +
 plugins/helper/api_collector.go                    |   2 +-
 plugins/helper/api_extractor.go                    |   2 +-
 plugins/helper/api_rawdata.go                      |  13 +-
 plugins/helper/data_convertor.go                   |   2 +-
 plugins/helper/graphql_collector.go                |   2 +-
 scripts/singer-model-generator.sh                  |  84 ++++++++
 utils/ipc.go                                       | 114 +++++++++++
 22 files changed, 1052 insertions(+), 40 deletions(-)

diff --git a/.env.example b/.env.example
index f143ea40..19b43e57 100644
--- a/.env.example
+++ b/.env.example
@@ -31,6 +31,9 @@ LOGGING_DIR=
 ENABLE_STACKTRACE=false
 FORCE_MIGRATION=false
 
+# Lake SINGER API
+SINGER_PROPERTIES_DIR=
+
 ##########################
 # Sensitive information encryption key
 ##########################
diff --git a/errors/types.go b/errors/types.go
index 06bd82dd..f8db7c0a 100644
--- a/errors/types.go
+++ b/errors/types.go
@@ -72,6 +72,9 @@ func (t *Type) New(message string, opts ...Option) Error {
 
 // Wrap constructs a new Error instance with this message and wraps the passed in error. A nil 'err' will return a nil Error.
 func (t *Type) Wrap(err error, message string, opts ...Option) Error {
+	if err == nil {
+		return nil
+	}
 	return newSingleCrdbError(t, err, message, opts...)
 }
 
diff --git a/errors/util.go b/errors/util.go
new file mode 100644
index 00000000..5c6b0559
--- /dev/null
+++ b/errors/util.go
@@ -0,0 +1,30 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package errors
+
+import "errors"
+
+// Is convenience passthrough for the native errors.Is method
+func Is(err, target error) bool {
+	return errors.Is(err, target)
+}
+
+// As convenience passthrough for the native errors.As method
+func As(err error, target any) bool {
+	return errors.As(err, &target)
+}
diff --git a/go.mod b/go.mod
index 96192cd9..8d190a25 100644
--- a/go.mod
+++ b/go.mod
@@ -16,6 +16,7 @@ require (
 	github.com/magiconair/properties v1.8.5
 	github.com/manifoldco/promptui v0.9.0
 	github.com/merico-dev/graphql v0.0.0-20221027131946-77460a1fd4cd
+	github.com/mitchellh/hashstructure v1.1.0
 	github.com/mitchellh/mapstructure v1.4.1
 	github.com/panjf2000/ants/v2 v2.4.6
 	github.com/robfig/cron/v3 v3.0.0
@@ -33,6 +34,7 @@ require (
 	go.temporal.io/api v1.7.1-0.20220223032354-6e6fe738916a
 	go.temporal.io/sdk v1.14.0
 	golang.org/x/crypto v0.1.0
+	golang.org/x/exp v0.0.0-20221028150844-83b7d23a625f
 	golang.org/x/oauth2 v0.0.0-20210402161424-2e8d93401602
 	golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4
 	gorm.io/datatypes v1.0.1
@@ -125,7 +127,7 @@ require (
 	golang.org/x/term v0.1.0 // indirect
 	golang.org/x/text v0.4.0 // indirect
 	golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac // indirect
-	golang.org/x/tools v0.1.12 // indirect
+	golang.org/x/tools v0.2.0 // indirect
 	google.golang.org/appengine v1.6.7 // indirect
 	google.golang.org/genproto v0.0.0-20220222213610-43724f9ea8cf // indirect
 	google.golang.org/grpc v1.44.0 // indirect
diff --git a/go.sum b/go.sum
index 8f17cd22..4677bdee 100644
--- a/go.sum
+++ b/go.sum
@@ -288,8 +288,8 @@ github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
 github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
 github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
 github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
-github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ=
 github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg=
 github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck=
 github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
 github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
@@ -361,7 +361,6 @@ github.com/iris-contrib/i18n v0.0.0-20171121225848-987a633949d0/go.mod h1:pMCz62
 github.com/iris-contrib/jade v1.1.3/go.mod h1:H/geBymxJhShH5kecoiOCSssPX7QWYH7UaeZTSWddIk=
 github.com/iris-contrib/pongo2 v0.0.1/go.mod h1:Ssh+00+3GAZqSQb30AvBRNxBx7rf0GqwkjqxNd0u65g=
 github.com/iris-contrib/schema v0.0.1/go.mod h1:urYA3uvUNG1TIIjOSCzHr9/LmbQo8LrOcOqfqxa4hXw=
-github.com/jackc/chunkreader v1.0.0 h1:4s39bBR8ByfqH+DKm8rQA3E1LHZWB9XWcrz8fqaZbe0=
 github.com/jackc/chunkreader v1.0.0/go.mod h1:RT6O25fNZIuasFJRyZ4R/Y2BbhasbmZXF9QQ7T3kePo=
 github.com/jackc/chunkreader/v2 v2.0.0/go.mod h1:odVSm741yZoC3dpHEUXIqA9tQRhFrgOHwnPIn9lDKlk=
 github.com/jackc/chunkreader/v2 v2.0.1 h1:i+RDz65UE+mmpjTfyz0MoVTnzeYxroil2G82ki7MGG8=
@@ -375,8 +374,6 @@ github.com/jackc/pgconn v1.5.1-0.20200601181101-fa742c524853/go.mod h1:QeD3lBfpT
 github.com/jackc/pgconn v1.8.0/go.mod h1:1C2Pb36bGIP9QHGBYCjnyhqu7Rv3sGshaQUvmfGIB/o=
 github.com/jackc/pgconn v1.9.0/go.mod h1:YctiPyvzfU11JFxoXokUOOKQXQmDMoJL9vJzHH8/2JY=
 github.com/jackc/pgconn v1.9.1-0.20210724152538-d89c8390a530/go.mod h1:4z2w8XhRbP1hYxkpTuBjTS3ne3J48K83+u0zoyvg2pI=
-github.com/jackc/pgconn v1.12.0 h1:/RvQ24k3TnNdfBSW0ou9EOi5jx2cX7zfE8n2nLKuiP0=
-github.com/jackc/pgconn v1.12.0/go.mod h1:ZkhRC59Llhrq3oSfrikvwQ5NaxYExr6twkdkMLaKono=
 github.com/jackc/pgconn v1.13.0 h1:3L1XMNV2Zvca/8BYhzcRFS70Lr0WlDg16Di6SFGAbys=
 github.com/jackc/pgconn v1.13.0/go.mod h1:AnowpAqO4CMIIJNZl2VJp+KrkAZciAkhEl0W0JIobpI=
 github.com/jackc/pgio v1.0.0 h1:g12B9UwVnzGhueNavwioyEEpAmqMe1E/BN9ES+8ovkE=
@@ -387,7 +384,6 @@ github.com/jackc/pgmock v0.0.0-20210724152146-4ad1a8207f65 h1:DadwsjnMwFjfWc9y5W
 github.com/jackc/pgmock v0.0.0-20210724152146-4ad1a8207f65/go.mod h1:5R2h2EEX+qri8jOWMbJCtaPWkrrNc7OHwsp2TCqp7ak=
 github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
 github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
-github.com/jackc/pgproto3 v1.1.0 h1:FYYE4yRw+AgI8wXIinMlNjBbp/UitDJwfj5LqqewP1A=
 github.com/jackc/pgproto3 v1.1.0/go.mod h1:eR5FA3leWg7p9aeAqi37XOTgTIbkABlvcPB3E5rlc78=
 github.com/jackc/pgproto3/v2 v2.0.0-alpha1.0.20190420180111-c116219b62db/go.mod h1:bhq50y+xrl9n5mRYyCBFKkpRVTLYJVWeCc+mEAI3yXA=
 github.com/jackc/pgproto3/v2 v2.0.0-alpha1.0.20190609003834-432c2951c711/go.mod h1:uH0AWtUmuShn0bcesswc4aBTWGvw0cAxIJp+6OB//Wg=
@@ -396,8 +392,6 @@ github.com/jackc/pgproto3/v2 v2.0.0-rc3.0.20190831210041-4c03ce451f29/go.mod h1:
 github.com/jackc/pgproto3/v2 v2.0.1/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA=
 github.com/jackc/pgproto3/v2 v2.0.6/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA=
 github.com/jackc/pgproto3/v2 v2.1.1/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA=
-github.com/jackc/pgproto3/v2 v2.3.0 h1:brH0pCGBDkBW07HWlN/oSBXrmo3WB0UvZd1pIuDcL8Y=
-github.com/jackc/pgproto3/v2 v2.3.0/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA=
 github.com/jackc/pgproto3/v2 v2.3.1 h1:nwj7qwf0S+Q7ISFfBndqeLwSwxs+4DPsbRFjECT1Y4Y=
 github.com/jackc/pgproto3/v2 v2.3.1/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA=
 github.com/jackc/pgservicefile v0.0.0-20200307190119-3430c5407db8/go.mod h1:vsD4gTJCa9TptPL8sPkXrLZ+hDuNrZCnj29CQpr4X1E=
@@ -411,8 +405,6 @@ github.com/jackc/pgtype v1.3.1-0.20200510190516-8cd94a14c75a/go.mod h1:vaogEUkAL
 github.com/jackc/pgtype v1.3.1-0.20200606141011-f6355165a91c/go.mod h1:cvk9Bgu/VzJ9/lxTO5R5sf80p0DiucVtN7ZxvaC4GmQ=
 github.com/jackc/pgtype v1.6.2/go.mod h1:JCULISAZBFGrHaOXIIFiyfzW5VY0GRitRr8NeJsrdig=
 github.com/jackc/pgtype v1.8.1-0.20210724151600-32e20a603178/go.mod h1:C516IlIV9NKqfsMCXTdChteoXmwgUceqaLfjg2e3NlM=
-github.com/jackc/pgtype v1.11.0 h1:u4uiGPz/1hryuXzyaBhSk6dnIyyG2683olG2OV+UUgs=
-github.com/jackc/pgtype v1.11.0/go.mod h1:LUMuVrfsFfdKGLw+AFFVv6KtHOFMwRgDDzBt76IqCA4=
 github.com/jackc/pgtype v1.12.0 h1:Dlq8Qvcch7kiehm8wPGIW0W3KsCCHJnRacKW0UM8n5w=
 github.com/jackc/pgtype v1.12.0/go.mod h1:LUMuVrfsFfdKGLw+AFFVv6KtHOFMwRgDDzBt76IqCA4=
 github.com/jackc/pgx/v4 v4.0.0-20190420224344-cc3461e65d96/go.mod h1:mdxmSJJuR08CZQyj1PVQBHy9XOp5p8/SHH6a0psbY9Y=
@@ -423,8 +415,6 @@ github.com/jackc/pgx/v4 v4.6.1-0.20200510190926-94ba730bb1e9/go.mod h1:t3/cdRQl6
 github.com/jackc/pgx/v4 v4.6.1-0.20200606145419-4e5062306904/go.mod h1:ZDaNWkt9sW1JMiNn0kdYBaLelIhw7Pg4qd+Vk6tw7Hg=
 github.com/jackc/pgx/v4 v4.10.1/go.mod h1:QlrWebbs3kqEZPHCTGyxecvzG6tvIsYu+A5b1raylkA=
 github.com/jackc/pgx/v4 v4.12.1-0.20210724153913-640aa07df17c/go.mod h1:1QD0+tgSXP7iUjYm9C1NxKhny7lq6ee99u/z+IHFcgs=
-github.com/jackc/pgx/v4 v4.16.0 h1:4k1tROTJctHotannFYzu77dY3bgtMRymQP7tXQjqpPk=
-github.com/jackc/pgx/v4 v4.16.0/go.mod h1:N0A9sFdWzkw/Jy1lwoiB64F2+ugFZi987zRxcPez/wI=
 github.com/jackc/pgx/v4 v4.17.2 h1:0Ut0rpeKwvIVbMQ1KbMBU4h6wxehBI535LK6Flheh8E=
 github.com/jackc/pgx/v4 v4.17.2/go.mod h1:lcxIZN44yMIrWI78a5CpucdD14hX0SBDbNRvjDBItsw=
 github.com/jackc/puddle v0.0.0-20190413234325-e4ced69a3a2b/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
@@ -432,7 +422,6 @@ github.com/jackc/puddle v0.0.0-20190608224051-11cab39313c9/go.mod h1:m4B5Dj62Y0f
 github.com/jackc/puddle v1.1.0/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
 github.com/jackc/puddle v1.1.1/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
 github.com/jackc/puddle v1.1.3/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
-github.com/jackc/puddle v1.2.1/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
 github.com/jackc/puddle v1.3.0/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
 github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 h1:BQSFePA1RWJOlocH6Fxy8MmwDt+yVQYULKfN0RoTN8A=
 github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99/go.mod h1:1lJo3i6rXxKeerYnT8Nvf0QmHCRC1n8sfWVwXF2Frvo=
@@ -441,7 +430,6 @@ github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD
 github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc=
 github.com/jinzhu/now v1.1.1/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
 github.com/jinzhu/now v1.1.2/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
-github.com/jinzhu/now v1.1.4 h1:tHnRBy1i5F2Dh8BAFxqFzxKqqvezXrL2OW1TnX+Mlas=
 github.com/jinzhu/now v1.1.4/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
 github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ=
 github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
@@ -540,8 +528,6 @@ github.com/mattn/goveralls v0.0.2/go.mod h1:8d1ZMHsd7fW6IRPKQh46F2WRpyib5/X4FOpe
 github.com/mediocregopher/mediocre-go-lib v0.0.0-20181029021733-cb65787f37ed/go.mod h1:dSsfyI2zABAdhcbvkXqgxOxrCsbYeHCPgrZkku60dSg=
 github.com/mediocregopher/radix/v3 v3.3.0/go.mod h1:EmfVyvspXz1uZEyPBMyGK+kjWiKQGvsUt6O3Pj+LDCQ=
 github.com/mediocregopher/radix/v3 v3.4.2/go.mod h1:8FL3F6UQRXHXIBSPUs5h0RybMF8i4n7wVopoX3x7Bv8=
-github.com/merico-dev/graphql v0.0.0-20220804061427-a2245fa66df2 h1:sOXuZIg3OwBnvJFfIuO8wegiLpeDCOSVvk2dsbjurd8=
-github.com/merico-dev/graphql v0.0.0-20220804061427-a2245fa66df2/go.mod h1:dcDqG8HXVtfEhTCipFMa0Q+RTKTtDKIO2vJt+JVzHEQ=
 github.com/merico-dev/graphql v0.0.0-20221027131946-77460a1fd4cd h1:hGQXd4a72JSFIZE+ZVkH5ivE925PGogjob6stgc2too=
 github.com/merico-dev/graphql v0.0.0-20221027131946-77460a1fd4cd/go.mod h1:dcDqG8HXVtfEhTCipFMa0Q+RTKTtDKIO2vJt+JVzHEQ=
 github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d h1:5PJl274Y63IEHC+7izoQE9x6ikvDFZS2mDVS3drnohI=
@@ -554,6 +540,8 @@ github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG
 github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
 github.com/mitchellh/go-testing-interface v1.0.0/go.mod h1:kRemZodwjscx+RGhAo8eIhFbs2+BFgRtFPeD/KE+zxI=
 github.com/mitchellh/gox v0.4.0/go.mod h1:Sd9lOJ0+aimLBi73mGofS1ycjY8lL3uZM3JPS42BGNg=
+github.com/mitchellh/hashstructure v1.1.0 h1:P6P1hdjqAAknpY/M1CGipelZgp+4y9ja9kmUZPXP+H0=
+github.com/mitchellh/hashstructure v1.1.0/go.mod h1:xUDAozZz0Wmdiufv0uyhnHkUTN6/6d8ulp4AwfLKrmA=
 github.com/mitchellh/iochan v1.0.0/go.mod h1:JwYml1nuB7xOzsp52dPpHFffvOCDupsG0QubkSMEySY=
 github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
 github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
@@ -674,7 +662,6 @@ github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag
 github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
 github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
 github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
-github.com/stretchr/objx v0.3.0 h1:NGXK3lHquSN08v5vWalVI/L8XU9hdzE/G6xsrze47As=
 github.com/stretchr/objx v0.3.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
 github.com/stretchr/objx v0.4.0 h1:M2gUjqZET1qApGOWNSnZ49BAIMX4F/1plDv3+l31EJ4=
 github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
@@ -683,7 +670,6 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV
 github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
 github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
 github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
-github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
 github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
 github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
 github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
@@ -788,7 +774,6 @@ golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2/go.mod h1:T9bdIzuCu7OtxOm
 golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
 golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
 golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
-golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 h1:7I4JAnoQBe7ZtJcBaYHi5UtiO8tQHbUSXxL+pnGRANg=
 golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
 golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
 golang.org/x/crypto v0.1.0 h1:MDRAIl0xIo9Io2xV565hzXHw3zVseKrJKodhohM5CjU=
@@ -803,6 +788,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0
 golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4=
 golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM=
 golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU=
+golang.org/x/exp v0.0.0-20221028150844-83b7d23a625f h1:Al51T6tzvuh3oiwX11vex3QgJ2XTedFPGmbEVh8cdoc=
+golang.org/x/exp v0.0.0-20221028150844-83b7d23a625f/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
 golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
 golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
 golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
@@ -828,7 +815,7 @@ golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
 golang.org/x/mod v0.4.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
 golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
 golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
-golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 h1:6zppjxzCulZykYSLyVDYbneBfbaBIQPYMevg0bEwv2s=
+golang.org/x/mod v0.6.0 h1:b9gGHsz9/HhJ3HF5DHQytPpuwocVTChQJK3AvoLRD5I=
 golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
 golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
 golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
@@ -879,8 +866,6 @@ golang.org/x/net v0.0.0-20211008194852-3b03d305991f/go.mod h1:9nx3DQGgdP8bBQD5qx
 golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
 golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
 golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
-golang.org/x/net v0.0.0-20220728211354-c7608f3a8462 h1:UreQrH7DbFXSi9ZFox6FNT3WBooWmdANpU+IfkT1T4I=
-golang.org/x/net v0.0.0-20220728211354-c7608f3a8462/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk=
 golang.org/x/net v0.1.0 h1:hZ/3BUoy5aId7sCpA/Tc5lt8DkFgdVS2onTpJsZ/fl0=
 golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco=
 golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
@@ -906,7 +891,6 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ
 golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
-golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
 golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 h1:uVc8UZUe6tr40fFVnUP5Oj+veunVezqYl9z7DYw9xzw=
 golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
@@ -982,13 +966,10 @@ golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBc
 golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.0.0-20220209214540-3681064d5158/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.0.0-20220222200937-f2425489ef4c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
-golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10 h1:WIoqL4EROvwiPdUtaip4VcDdpZ4kha7wBWZrbVKCIZg=
-golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.1.0 h1:kunALQeHf1/185U1i0GOB/fy1IPRDDpuoOOqRReG57U=
 golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
 golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
-golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 h1:JGgROgKl9N8DuW20oFS5gxc+lE67/N3FcwmBPMe7ArY=
 golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
 golang.org/x/term v0.1.0 h1:g6Z6vPFA9dYBAF7DWcH6sCcOntplXsDKcliusYijMlw=
 golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
@@ -1000,7 +981,6 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
 golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
 golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
 golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
-golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk=
 golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
 golang.org/x/text v0.4.0 h1:BrVqGRd7+k1DiOgtnFvAkoQEWQvBc25ouMJM6429SFg=
 golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
@@ -1072,10 +1052,8 @@ golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
 golang.org/x/tools v0.1.2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
 golang.org/x/tools v0.1.3/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
 golang.org/x/tools v0.1.7/go.mod h1:LGqMHiF4EqQNHR1JncWGqT5BVaXmza+X+BDGol+dOxo=
-golang.org/x/tools v0.1.11 h1:loJ25fNOEhSXfHrpoGj91eCUThwdNX6u24rO1xnNteY=
-golang.org/x/tools v0.1.11/go.mod h1:SgwaegtQh8clINPpECJMqnxLv9I09HLqnW3RMqW0CA4=
-golang.org/x/tools v0.1.12 h1:VveCTK38A2rkS8ZqFY25HIDFscX5X9OoEhJd3quQmXU=
-golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
+golang.org/x/tools v0.2.0 h1:G6AHpWxTMGY1KyEYoAQ5WTtIekUUvDNjan3ugu60JvE=
+golang.org/x/tools v0.2.0/go.mod h1:y4OqIKeOV/fWJetJ8bXPU1sEVniLMIyDAZWeHdV+NTA=
 golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
 golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
 golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
@@ -1236,8 +1214,6 @@ gorm.io/driver/mysql v1.0.5/go.mod h1:N1OIhHAIhx5SunkMGqWbGFVeh4yTNWKmMo1GOAsohL
 gorm.io/driver/mysql v1.3.3 h1:jXG9ANrwBc4+bMvBcSl8zCfPBaVoPyBEBshA8dA93X8=
 gorm.io/driver/mysql v1.3.3/go.mod h1:ChK6AHbHgDCFZyJp0F+BmVGb06PSIoh9uVYKAlRbb2U=
 gorm.io/driver/postgres v1.0.8/go.mod h1:4eOzrI1MUfm6ObJU/UcmbXyiHSs8jSwH95G5P5dxcAg=
-gorm.io/driver/postgres v1.3.5 h1:oVLmefGqBTlgeEVG6LKnH6krOlo4TZ3Q/jIK21KUMlw=
-gorm.io/driver/postgres v1.3.5/go.mod h1:EGCWefLFQSVFrHGy4J8EtiHCWX5Q8t0yz2Jt9aKkGzU=
 gorm.io/driver/postgres v1.4.5 h1:mTeXTTtHAgnS9PgmhN2YeUbazYpLhUI1doLnw42XUZc=
 gorm.io/driver/postgres v1.4.5/go.mod h1:GKNQYSJ14qvWkvPwXljMGehpKrhlDNsqYRr5HnYGncg=
 gorm.io/driver/sqlite v1.1.4 h1:PDzwYE+sI6De2+mxAneV9Xs11+ZyKV6oxD3wDGkaNvM=
@@ -1250,8 +1226,6 @@ gorm.io/gorm v1.21.3/go.mod h1:0HFTzE/SqkGTzK6TlDPPQbAYCluiVvhzoA1+aVyzenw=
 gorm.io/gorm v1.21.4/go.mod h1:0HFTzE/SqkGTzK6TlDPPQbAYCluiVvhzoA1+aVyzenw=
 gorm.io/gorm v1.21.6/go.mod h1:F+OptMscr0P2F2qU97WT1WimdH9GaQPoDW7AYd5i2Y0=
 gorm.io/gorm v1.23.1/go.mod h1:l2lP/RyAtc1ynaTjFksBde/O8v9oOGIApu2/xRitmZk=
-gorm.io/gorm v1.23.4 h1:1BKWM67O6CflSLcwGQR7ccfmC4ebOxQrTfOQGRE9wjg=
-gorm.io/gorm v1.23.4/go.mod h1:l2lP/RyAtc1ynaTjFksBde/O8v9oOGIApu2/xRitmZk=
 gorm.io/gorm v1.24.1-0.20221019064659-5dd2bb482755 h1:7AdrbfcvKnzejfqP5g37fdSZOXH/JvaPIzBIHTOqXKk=
 gorm.io/gorm v1.24.1-0.20221019064659-5dd2bb482755/go.mod h1:DVrVomtaYTbqs7gB/x2uVvqnXzv0nqjB396B8cG4dBA=
 honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
diff --git a/helpers/pluginhelper/tap/models.go b/helpers/pluginhelper/tap/models.go
new file mode 100644
index 00000000..67dcc573
--- /dev/null
+++ b/helpers/pluginhelper/tap/models.go
@@ -0,0 +1,89 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package tap
+
+import (
+	"encoding/json"
+	"github.com/apache/incubator-devlake/models/migrationscripts/archived"
+	"github.com/apache/incubator-devlake/plugins/core"
+	"gorm.io/datatypes"
+	"time"
+)
+
+// abstract tap types
+type (
+	// Record the fields embedded in a singer-tap record. The specifics of the record are tap-implementation specific.
+	Record[R any] struct {
+		Type          string    `json:"type"`
+		Stream        string    `json:"stream"`
+		TimeExtracted time.Time `json:"time_extracted"`
+		Record        R         `json:"record"`
+	}
+	// State the fields embedded in a singer-tap state. The specifics of the value are tap-implementation specific.
+	State struct {
+		Type  string         `json:"type"`
+		Value map[string]any `json:"value"`
+	}
+
+	// RawState The raw-database version of State
+	RawState struct {
+		archived.GenericModel[string]
+		Type  string
+		Value datatypes.JSON
+	}
+
+	// Output raw data from a tap. One of these fields can ever be non-nil
+	Output[R any] interface {
+		// AsTapState tries to convert the map object to a State. Returns false if it can't be done.
+		AsTapState() (*State, bool)
+		// AsTapRecord tries to convert the map object to a Record. Returns false if it can't be done.
+		AsTapRecord() (*Record[R], bool)
+	}
+)
+
+// TableName the table name
+func (*RawState) TableName() string {
+	return "_devlake_collector_state"
+}
+
+// FromState converts State to RawState
+func FromState(t *State) *RawState {
+	b, err := json.Marshal(t.Value)
+	if err != nil {
+		panic(err)
+	}
+	return &RawState{
+		Type:  t.Type,
+		Value: b,
+	}
+}
+
+// ToState converts RawState to State
+func ToState(raw *RawState) *State {
+	val := new(map[string]any)
+	err := json.Unmarshal(raw.Value, val)
+	if err != nil {
+		panic(err)
+	}
+	return &State{
+		Type:  raw.Type,
+		Value: *val,
+	}
+}
+
+var _ core.Tabler = (*RawState)(nil)
diff --git a/helpers/pluginhelper/tap/singer_models.go b/helpers/pluginhelper/tap/singer_models.go
new file mode 100644
index 00000000..f6eebdd2
--- /dev/null
+++ b/helpers/pluginhelper/tap/singer_models.go
@@ -0,0 +1,45 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package tap
+
+// singer-tap specific types
+type (
+	// SingerTapSchema the structure of this is determined by the catalog/properties JSON of a singer tap
+	SingerTapSchema map[string]any
+	// SingerTapMetadata the structure of this is determined by the catalog/properties JSON of a singer tap
+	SingerTapMetadata map[string]any
+	// SingerTapStream the deserialized version of each stream entry in the catalog/properties JSON of a singer tap
+	SingerTapStream struct {
+		Stream        string              `json:"stream"`
+		TapStreamId   string              `json:"tap_stream_id"`
+		Schema        SingerTapSchema     `json:"schema"`
+		Metadata      []SingerTapMetadata `json:"metadata"`
+		KeyProperties any                 `json:"key_properties"`
+	}
+	// SingerTapConfig the set of variables needed to initialize a SingerTap
+	SingerTapConfig struct {
+		Cmd                  string
+		StreamPropertiesFile string
+		IsLegacy             bool
+	}
+
+	// SingerTapProperties wraps SingerTapStreams
+	SingerTapProperties struct {
+		Streams []*SingerTapStream `json:"streams"`
+	}
+)
diff --git a/helpers/pluginhelper/tap/singer_tap_client.go b/helpers/pluginhelper/tap/singer_tap_client.go
new file mode 100644
index 00000000..452f5ebc
--- /dev/null
+++ b/helpers/pluginhelper/tap/singer_tap_client.go
@@ -0,0 +1,47 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package tap
+
+import (
+	"github.com/apache/incubator-devlake/config"
+	"github.com/apache/incubator-devlake/errors"
+)
+
+// SingerTapArgs the args needed to instantiate tap.Tap for singer-taps
+type SingerTapArgs struct {
+	// Name of the env variable that expands to the tap binary path
+	TapClass string
+	// The name of the properties/catalog JSON file of the tap
+	StreamPropertiesFile string
+	// IsLegacy - set to true if this is an old tap that uses the "--properties" flag
+	IsLegacy bool
+}
+
+// NewSingerTapClient returns an instance of tap.Tap for singer-taps
+func NewSingerTapClient(args *SingerTapArgs) (*SingerTap, errors.Error) {
+	env := config.GetConfig()
+	cmd := env.GetString(args.TapClass)
+	if cmd == "" {
+		return nil, errors.Default.New("singer tap command not provided")
+	}
+	return NewSingerTap(&SingerTapConfig{
+		Cmd:                  cmd,
+		StreamPropertiesFile: args.StreamPropertiesFile,
+		IsLegacy:             args.IsLegacy,
+	})
+}
diff --git a/helpers/pluginhelper/tap/singer_tap_impl.go b/helpers/pluginhelper/tap/singer_tap_impl.go
new file mode 100644
index 00000000..4786443a
--- /dev/null
+++ b/helpers/pluginhelper/tap/singer_tap_impl.go
@@ -0,0 +1,224 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package tap
+
+import (
+	"bufio"
+	"encoding/json"
+	"github.com/apache/incubator-devlake/config"
+	"github.com/apache/incubator-devlake/errors"
+	"github.com/apache/incubator-devlake/utils"
+	"github.com/mitchellh/hashstructure"
+	"os"
+	"os/exec"
+	"path/filepath"
+)
+
+const singerPropertiesDir = "SINGER_PROPERTIES_DIR"
+
+type (
+	// SingerTap the Singer implementation of Tap
+	SingerTap struct {
+		*SingerTapConfig
+		cmd            string
+		name           string
+		tempLocation   string
+		propertiesFile *fileData[SingerTapProperties]
+		stateFile      *fileData[[]byte]
+		configFile     *fileData[[]byte]
+	}
+	fileData[Content any] struct {
+		path    string
+		content *Content
+	}
+)
+
+// NewSingerTap the constructor for SingerTap
+func NewSingerTap(cfg *SingerTapConfig) (*SingerTap, errors.Error) {
+	tempDir, err := errors.Convert01(os.MkdirTemp("", "singer"+"_*"))
+	if err != nil {
+		return nil, errors.Default.Wrap(err, "couldn't create temp directory for singer-tap")
+	}
+	propsFile, err := readProperties(tempDir, cfg)
+	if err != nil {
+		return nil, err
+	}
+	tapName := filepath.Base(cfg.Cmd)
+	return &SingerTap{
+		cmd:             cfg.Cmd,
+		name:            tapName,
+		tempLocation:    tempDir,
+		propertiesFile:  propsFile,
+		SingerTapConfig: cfg,
+	}, nil
+}
+
+// SetConfig implements Tap.SetConfig
+func (t *SingerTap) SetConfig(cfg any) errors.Error {
+	b, err := json.Marshal(cfg)
+	if err != nil {
+		return errors.Default.Wrap(err, "error reading singer-tap mappings")
+	}
+	file, err := os.OpenFile(filepath.Join(t.tempLocation, "config.json"), os.O_RDWR|os.O_CREATE|os.O_TRUNC, os.ModePerm)
+	if err != nil {
+		return errors.Default.Wrap(err, "error opening singer-tap config file")
+	}
+	_, err = file.Write(b)
+	if err != nil {
+		return errors.Default.Wrap(err, "error writing to singer-tap config file")
+	}
+	t.configFile = &fileData[[]byte]{
+		path:    file.Name(),
+		content: &b,
+	}
+	return nil
+}
+
+// SetState implements Tap.SetState
+func (t *SingerTap) SetState(state any) errors.Error {
+	b, err := json.Marshal(state)
+	if err != nil {
+		return errors.Default.Wrap(err, "error serializing singer-tap state")
+	}
+	file, err := os.OpenFile(filepath.Join(t.tempLocation, "state.json"), os.O_RDWR|os.O_CREATE|os.O_TRUNC, os.ModePerm)
+	if err != nil {
+		return errors.Default.Wrap(err, "error opening singer-tap state file")
+	}
+	_, err = file.Write(b)
+	if err != nil {
+		return errors.Default.Wrap(err, "error writing to singer-tap state file")
+	}
+	t.stateFile = &fileData[[]byte]{
+		path:    file.Name(),
+		content: &b,
+	}
+	return nil
+}
+
+// SetProperties implements Tap.SetProperties
+func (t *SingerTap) SetProperties(streamName string, propsModifier func(props *SingerTapStream) bool) (uint64, errors.Error) {
+	modified := t.modifyProperties(streamName, propsModifier)
+	err := t.writeProperties()
+	if err != nil {
+		return 0, errors.Default.Wrap(err, "error trying to modify singer-tap properties")
+	}
+	return hash(modified)
+}
+
+// GetName implements Tap.GetName
+func (t *SingerTap) GetName() string {
+	return t.name
+}
+
+// Run implements Tap.Run
+func (t *SingerTap) Run() (<-chan *utils.ProcessResponse[Output[json.RawMessage]], errors.Error) {
+	catalogCmd := "--catalog"
+	if t.IsLegacy {
+		catalogCmd = "--properties"
+	}
+	args := []string{"--config", t.configFile.path, catalogCmd, t.propertiesFile.path}
+	if t.stateFile != nil {
+		args = append(args, []string{"--state", t.stateFile.path}...)
+	}
+	cmd := exec.Command(t.cmd, args...)
+	stream, err := utils.StreamProcess(cmd, func(b []byte) (Output[json.RawMessage], error) {
+		var output Output[json.RawMessage]
+		output, err := NewSingerTapOutput(b)
+		if err != nil {
+			return nil, err
+		}
+		return output, nil //data is expected to be JSON
+	})
+	if err != nil {
+		return nil, errors.Default.Wrap(err, "error starting process stream from singer-tap")
+	}
+	return stream, nil
+}
+
+func readProperties(tempDir string, cfg *SingerTapConfig) (*fileData[SingerTapProperties], errors.Error) {
+	globalDir := config.GetConfig().GetString(singerPropertiesDir)
+	_, err := os.Stat(globalDir)
+	if err != nil {
+		return nil, errors.Default.Wrap(err, "error getting singer props directory")
+	}
+	globalPath := filepath.Join(globalDir, cfg.StreamPropertiesFile)
+	b, err := os.ReadFile(globalPath)
+	if err != nil {
+		panic(err)
+	}
+	var props SingerTapProperties
+	err = json.Unmarshal(b, &props)
+	if err != nil {
+		return nil, errors.Default.Wrap(err, "error deserializing singer-tap properties")
+	}
+	return &fileData[SingerTapProperties]{
+		path:    filepath.Join(tempDir, "properties.json"),
+		content: &props,
+	}, nil
+}
+
+func (t *SingerTap) writeProperties() error {
+	file, err := os.OpenFile(t.propertiesFile.path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, os.ModePerm)
+	if err != nil {
+		return err
+	}
+	b, err := json.Marshal(t.propertiesFile.content)
+	if err != nil {
+		return err
+	}
+	writer := bufio.NewWriter(file)
+	if _, err = writer.Write(b); err != nil {
+		return err
+	}
+	return writer.Flush()
+}
+
+func hash(x any) (uint64, errors.Error) {
+	version, err := hashstructure.Hash(x, nil)
+	if err != nil {
+		return 0, errors.Default.WrapRaw(err)
+	}
+	return version, nil
+}
+
+var _ Tap[SingerTapStream] = (*SingerTap)(nil)
+
+func (t *SingerTap) modifyProperties(streamName string, propsModifier func(props *SingerTapStream) bool) *SingerTapStream {
+	properties := t.propertiesFile.content
+	for i := 0; i < len(properties.Streams); i++ {
+		stream := properties.Streams[i]
+		if !defaultSingerStreamSetter(streamName, stream) {
+			continue
+		}
+		if propsModifier != nil && propsModifier(stream) {
+			return stream
+		}
+	}
+	return nil
+}
+
+func defaultSingerStreamSetter(name string, stream *SingerTapStream) bool {
+	if stream.Stream != name {
+		return false
+	}
+	for _, meta := range stream.Metadata {
+		innerMeta := meta["metadata"].(map[string]any)
+		innerMeta["selected"] = true
+	}
+	return true
+}
diff --git a/helpers/pluginhelper/tap/singer_tap_output.go b/helpers/pluginhelper/tap/singer_tap_output.go
new file mode 100644
index 00000000..0e8bacd6
--- /dev/null
+++ b/helpers/pluginhelper/tap/singer_tap_output.go
@@ -0,0 +1,73 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package tap
+
+import (
+	"encoding/json"
+	"github.com/apache/incubator-devlake/errors"
+)
+
+// SingerOutput raw data from a tap. One of these fields can ever be non-nil
+type SingerOutput struct {
+	state  *State
+	record *Record[json.RawMessage]
+}
+
+// NewSingerTapOutput construct for Output. The src is the raw data coming from the tap
+func NewSingerTapOutput(src json.RawMessage) (Output[json.RawMessage], errors.Error) {
+	srcMap := map[string]any{}
+	err := convert(src, &srcMap)
+	if err != nil {
+		return nil, err
+	}
+	ret := &SingerOutput{}
+	srcType := srcMap["type"]
+	if srcType == "STATE" {
+		state := State{}
+		if err = convert(src, &state); err != nil {
+			return nil, err
+		}
+		ret.state = &state
+	} else if srcType == "RECORD" {
+		record := Record[json.RawMessage]{}
+		if err = convert(src, &record); err != nil {
+			return nil, err
+		}
+		ret.record = &record
+	}
+	return ret, nil
+}
+
+// AsTapState tries to convert the map object to a State. Returns false if it can't be done.
+func (r *SingerOutput) AsTapState() (*State, bool) {
+	return r.state, r.state != nil
+}
+
+// AsTapRecord tries to convert the map object to a Record. Returns false if it can't be done.
+func (r *SingerOutput) AsTapRecord() (*Record[json.RawMessage], bool) {
+	return r.record, r.record != nil
+}
+
+func convert(src json.RawMessage, dest any) errors.Error {
+	if err := json.Unmarshal(src, dest); err != nil {
+		return errors.Default.Wrap(err, "error converting type")
+	}
+	return nil
+}
+
+var _ Output[json.RawMessage] = (*SingerOutput)(nil)
diff --git a/helpers/pluginhelper/tap/tap.go b/helpers/pluginhelper/tap/tap.go
new file mode 100644
index 00000000..4d54958c
--- /dev/null
+++ b/helpers/pluginhelper/tap/tap.go
@@ -0,0 +1,39 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package tap
+
+import (
+	"encoding/json"
+	"github.com/apache/incubator-devlake/errors"
+	"github.com/apache/incubator-devlake/utils"
+)
+
+// Tap the abstract interface for Taps. Consumer code should not use concrete implementations directly.
+type Tap[Stream any] interface {
+	// Run runs the tap and returns a stream of results. Expected to be called after all the other Setters.
+	Run() (<-chan *utils.ProcessResponse[Output[json.RawMessage]], errors.Error)
+	// GetName the name of this tap
+	GetName() string
+	// SetProperties Sets the properties of the tap and allows you to modify the properties at runtime.
+	// Returns a unique hash representing the properties object.
+	SetProperties(streamName string, propsModifier func(props *Stream) bool) (uint64, errors.Error)
+	// SetState sets state on this tap
+	SetState(state any) errors.Error
+	// SetConfig sets the config of this tap
+	SetConfig(config any) errors.Error
+}
diff --git a/helpers/pluginhelper/tap/tap_collector.go b/helpers/pluginhelper/tap/tap_collector.go
new file mode 100644
index 00000000..aac67fde
--- /dev/null
+++ b/helpers/pluginhelper/tap/tap_collector.go
@@ -0,0 +1,217 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package tap
+
+import (
+	"encoding/json"
+	"fmt"
+	"github.com/apache/incubator-devlake/errors"
+	"github.com/apache/incubator-devlake/plugins/core"
+	"github.com/apache/incubator-devlake/plugins/core/dal"
+	"github.com/apache/incubator-devlake/plugins/helper"
+	"gorm.io/gorm"
+)
+
+// CollectorArgs args to initialize a Collector
+type CollectorArgs[Stream any] struct {
+	helper.RawDataSubTaskArgs
+	// The function that creates and returns a tap client
+	TapClient Tap[Stream]
+	// Optional - This function is called for the selected streams at runtime. Use this if any runtime modification is needed.
+	TapStreamModifier func(stream *Stream) bool
+	// The config the tap needs at runtime in order to execute
+	TapConfig any
+	// The specific tap stream to invoke at runtime
+	StreamName   string
+	ConnectionId uint64
+	Table        string
+	Incremental  bool
+}
+
+// Collector the collector that communicates with singer taps
+type Collector[Stream any] struct {
+	ctx               core.SubTaskContext
+	rawSubtask        *helper.RawDataSubTask
+	tapClient         Tap[Stream]
+	tapStreamModifier func(stream *Stream) bool
+	tapConfig         any
+	streamVersion     uint64
+	streamName        string
+	connectionId      uint64
+	incremental       bool
+}
+
+// NewTapCollector constructor for Collector
+func NewTapCollector[Stream any](args *CollectorArgs[Stream]) (*Collector[Stream], errors.Error) {
+	rawDataSubTask, err := helper.NewRawDataSubTask(args.RawDataSubTaskArgs)
+	if err != nil {
+		return nil, err
+	}
+	collector := &Collector[Stream]{
+		ctx:               args.Ctx,
+		rawSubtask:        rawDataSubTask,
+		tapClient:         args.TapClient,
+		tapStreamModifier: args.TapStreamModifier,
+		streamName:        args.StreamName,
+		tapConfig:         args.TapConfig,
+		connectionId:      args.ConnectionId,
+		incremental:       args.Incremental,
+	}
+	if err = collector.prepareTap(); err != nil {
+		return nil, err
+	}
+	return collector, nil
+}
+
+func (c *Collector[Stream]) getState() (*State, errors.Error) {
+	db := c.ctx.GetDal()
+	rawState := RawState{}
+	rawState.ID = c.getStateId()
+	if err := db.First(&rawState); err != nil {
+		if errors.Is(err, gorm.ErrRecordNotFound) {
+			return nil, errors.NotFound.Wrap(err, "record not found")
+		}
+		return nil, err
+	}
+	return ToState(&rawState), nil
+}
+
+func (c *Collector[Stream]) pushState(state *State) errors.Error {
+	db := c.ctx.GetDal()
+	rawState := FromState(state)
+	rawState.ID = c.getStateId()
+	return db.CreateOrUpdate(rawState)
+}
+
+func (c *Collector[Stream]) getStateId() string {
+	return fmt.Sprintf("{%s:%d:%d}", fmt.Sprintf("%s::%s", c.tapClient.GetName(), c.streamName), c.connectionId, c.streamVersion)
+}
+
+func (c *Collector[Stream]) prepareTap() errors.Error {
+	if c.tapConfig == nil {
+		return errors.Default.New("no tap config is set")
+	}
+	err := c.tapClient.SetConfig(c.tapConfig)
+	if err != nil {
+		return err
+	}
+	c.streamVersion, err = c.tapClient.SetProperties(c.streamName, c.tapStreamModifier)
+	if err != nil {
+		return err
+	}
+	return nil
+}
+
+// Execute executes the collector
+func (c *Collector[Stream]) Execute() (err errors.Error) {
+	initialState, err := c.getState()
+	if err != nil && err.GetType() != errors.NotFound {
+		return err
+	}
+	if initialState != nil {
+		err = c.tapClient.SetState(initialState.Value)
+		if err != nil {
+			return err
+		}
+	}
+	resultStream, err := c.tapClient.Run()
+	if err != nil {
+		return err
+	}
+	err = c.prepareDB()
+	if err != nil {
+		return err
+	}
+	c.ctx.SetProgress(0, -1)
+	ctx := c.ctx.GetContext()
+	var batchedResults []json.RawMessage
+	defer func() {
+		// push whatever is left
+		err = c.pushResults(batchedResults)
+	}()
+	for result := range resultStream {
+		if result.Err != nil {
+			err = errors.Default.Wrap(result.Err, "error found in streamed tap result")
+			return err
+		}
+		select {
+		case <-ctx.Done():
+			err = errors.Convert(ctx.Err())
+			return err
+		default:
+		}
+		output := result.Data
+		if tapRecord, ok := output.AsTapRecord(); ok {
+			batchedResults = append(batchedResults, tapRecord.Record)
+			c.ctx.IncProgress(1)
+			continue
+		} else if tapState, ok := output.AsTapState(); ok {
+			err = c.pushResults(batchedResults)
+			if err != nil {
+				return err
+			}
+			tapState.Type = fmt.Sprintf("TAP_%s", tapState.Type)
+			err = c.pushState(tapState)
+			if err != nil {
+				return errors.Default.Wrap(err, "error saving tap state")
+			}
+			batchedResults = nil
+			continue
+		}
+	}
+	return nil
+}
+
+func (c *Collector[Stream]) pushResults(results []json.RawMessage) errors.Error {
+	if len(results) == 0 {
+		return nil
+	}
+	c.ctx.GetLogger().Info("%s flushing %d records", c.tapClient.GetName(), len(results))
+	rows := make([]*helper.RawData, len(results))
+	defaultInput, _ := json.Marshal(nil)
+	for i, result := range results {
+		rows[i] = &helper.RawData{
+			Params: c.rawSubtask.GetParams(),
+			Data:   result,
+			Url:    "",           // n/a
+			Input:  defaultInput, // n/a
+		}
+	}
+	err := c.ctx.GetDal().Create(rows, dal.From(c.rawSubtask.GetTable()))
+	if err != nil {
+		return errors.Default.Wrap(err, "error pushing records to collector table")
+	}
+	return nil
+}
+
+func (c *Collector[Stream]) prepareDB() errors.Error {
+	db := c.ctx.GetDal()
+	err := db.AutoMigrate(&helper.RawData{}, dal.From(c.rawSubtask.GetTable()))
+	if err != nil {
+		return errors.Default.Wrap(err, "error auto-migrating collector")
+	}
+	if !c.incremental {
+		err = c.ctx.GetDal().Delete(&helper.RawData{}, dal.From(c.rawSubtask.GetTable()), dal.Where("params = ?", c.rawSubtask.GetParams()))
+		if err != nil {
+			return errors.Default.Wrap(err, "error deleting data from collector")
+		}
+	}
+	return nil
+}
+
+var _ core.SubTask = (*Collector[any])(nil)
diff --git a/models/migrationscripts/20221101_add_collector_state.go b/models/migrationscripts/20221101_add_collector_state.go
new file mode 100644
index 00000000..cfbe21c7
--- /dev/null
+++ b/models/migrationscripts/20221101_add_collector_state.go
@@ -0,0 +1,49 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package migrationscripts
+
+import (
+	"github.com/apache/incubator-devlake/errors"
+	commonArchived "github.com/apache/incubator-devlake/models/migrationscripts/archived"
+	"github.com/apache/incubator-devlake/plugins/core"
+	"gorm.io/datatypes"
+)
+
+type createCollectorState struct{}
+
+type CollectorState20221101 struct {
+	commonArchived.GenericModel[string]
+	Type  string
+	Value datatypes.JSON
+}
+
+func (CollectorState20221101) TableName() string {
+	return "_devlake_collector_state"
+}
+
+func (*createCollectorState) Up(basicRes core.BasicRes) errors.Error {
+	return basicRes.GetDal().AutoMigrate(CollectorState20221101{})
+}
+
+func (*createCollectorState) Version() uint64 {
+	return 20221101000001
+}
+
+func (*createCollectorState) Name() string {
+	return "Create collector state table"
+}
diff --git a/models/migrationscripts/archived/base.go b/models/migrationscripts/archived/base.go
index b64b74a1..2d37aa07 100644
--- a/models/migrationscripts/archived/base.go
+++ b/models/migrationscripts/archived/base.go
@@ -18,6 +18,7 @@ limitations under the License.
 package archived
 
 import (
+	"golang.org/x/exp/constraints"
 	"time"
 )
 
@@ -32,6 +33,12 @@ type Model struct {
 	UpdatedAt time.Time `json:"updatedAt"`
 }
 
+type GenericModel[T string | constraints.Unsigned] struct {
+	ID        T         `gorm:"primaryKey" json:"id"`
+	CreatedAt time.Time `json:"createdAt"`
+	UpdatedAt time.Time `json:"updatedAt"`
+}
+
 type NoPKModel struct {
 	CreatedAt time.Time `json:"createdAt"`
 	UpdatedAt time.Time `json:"updatedAt"`
diff --git a/models/migrationscripts/register.go b/models/migrationscripts/register.go
index 225da018..93f9718a 100644
--- a/models/migrationscripts/register.go
+++ b/models/migrationscripts/register.go
@@ -51,5 +51,6 @@ func All() []core.MigrationScript {
 		new(commitLineChange),
 		new(changeLeadTimeMinutesToInt64),
 		new(addRepoSnapshot),
+		new(createCollectorState),
 	}
 }
diff --git a/plugins/helper/api_collector.go b/plugins/helper/api_collector.go
index 5b0142b7..4944b4e4 100644
--- a/plugins/helper/api_collector.go
+++ b/plugins/helper/api_collector.go
@@ -94,7 +94,7 @@ type ApiCollector struct {
 // of response we want to save, ApiCollector will collect them from remote server and store them into database.
 func NewApiCollector(args ApiCollectorArgs) (*ApiCollector, errors.Error) {
 	// process args
-	rawDataSubTask, err := newRawDataSubTask(args.RawDataSubTaskArgs)
+	rawDataSubTask, err := NewRawDataSubTask(args.RawDataSubTaskArgs)
 	if err != nil {
 		return nil, errors.Default.Wrap(err, "Couldn't resolve raw subtask args")
 	}
diff --git a/plugins/helper/api_extractor.go b/plugins/helper/api_extractor.go
index dae6ae49..4803026e 100644
--- a/plugins/helper/api_extractor.go
+++ b/plugins/helper/api_extractor.go
@@ -48,7 +48,7 @@ type ApiExtractor struct {
 // NewApiExtractor creates a new ApiExtractor
 func NewApiExtractor(args ApiExtractorArgs) (*ApiExtractor, errors.Error) {
 	// process args
-	rawDataSubTask, err := newRawDataSubTask(args.RawDataSubTaskArgs)
+	rawDataSubTask, err := NewRawDataSubTask(args.RawDataSubTaskArgs)
 	if err != nil {
 		return nil, err
 	}
diff --git a/plugins/helper/api_rawdata.go b/plugins/helper/api_rawdata.go
index a569674d..add2338c 100644
--- a/plugins/helper/api_rawdata.go
+++ b/plugins/helper/api_rawdata.go
@@ -56,7 +56,8 @@ type RawDataSubTask struct {
 	params string
 }
 
-func newRawDataSubTask(args RawDataSubTaskArgs) (*RawDataSubTask, errors.Error) {
+// NewRawDataSubTask constructor for RawDataSubTask
+func NewRawDataSubTask(args RawDataSubTaskArgs) (*RawDataSubTask, errors.Error) {
 	if args.Ctx == nil {
 		return nil, errors.Default.New("Ctx is required for RawDataSubTask")
 	}
@@ -80,3 +81,13 @@ func newRawDataSubTask(args RawDataSubTaskArgs) (*RawDataSubTask, errors.Error)
 		params: paramsString,
 	}, nil
 }
+
+// GetTable returns the raw table name
+func (r *RawDataSubTask) GetTable() string {
+	return r.table
+}
+
+// GetParams returns the raw params
+func (r *RawDataSubTask) GetParams() string {
+	return r.params
+}
diff --git a/plugins/helper/data_convertor.go b/plugins/helper/data_convertor.go
index 848a677b..6b1aacc8 100644
--- a/plugins/helper/data_convertor.go
+++ b/plugins/helper/data_convertor.go
@@ -59,7 +59,7 @@ type DataConverter struct {
 // NewDataConverter function helps you create a DataConverter using DataConverterArgs.
 // You can see the usage in plugins/github/tasks/pr_issue_convertor.go or other convertor file.
 func NewDataConverter(args DataConverterArgs) (*DataConverter, errors.Error) {
-	rawDataSubTask, err := newRawDataSubTask(args.RawDataSubTaskArgs)
+	rawDataSubTask, err := NewRawDataSubTask(args.RawDataSubTaskArgs)
 	if err != nil {
 		return nil, err
 	}
diff --git a/plugins/helper/graphql_collector.go b/plugins/helper/graphql_collector.go
index c03a9eec..1d41e584 100644
--- a/plugins/helper/graphql_collector.go
+++ b/plugins/helper/graphql_collector.go
@@ -86,7 +86,7 @@ type GraphqlCollector struct {
 // of response we want to save, GraphqlCollector will collect them from remote server and store them into database.
 func NewGraphqlCollector(args GraphqlCollectorArgs) (*GraphqlCollector, errors.Error) {
 	// process args
-	rawDataSubTask, err := newRawDataSubTask(args.RawDataSubTaskArgs)
+	rawDataSubTask, err := NewRawDataSubTask(args.RawDataSubTaskArgs)
 	if err != nil {
 		return nil, errors.Default.Wrap(err, "error processing raw subtask args")
 	}
diff --git a/scripts/singer-model-generator.sh b/scripts/singer-model-generator.sh
new file mode 100644
index 00000000..79bf04bc
--- /dev/null
+++ b/scripts/singer-model-generator.sh
@@ -0,0 +1,84 @@
+#!/bin/sh
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#===================================== constants =======================================
+
+time_format='
+  {
+    "type": "time.Time",
+    "imports": ["time"]
+  }
+'
+
+#======================================================================================
+
+json_path=$1 # e.g. "./config/singer/github.json"
+tap_stream=$2 # e.g. "issues"
+plugin_path=$3 # e.g. "./plugins/github_singer"
+
+if [ $# != 3 ]; then
+  printf "not enough args. Usage: <json_path> <tap_stream> <output_path>: e.g.\n    \"./config/singer/github.json\" \"issues\" \"./plugins/github_singer\"\n"
+  exit 1
+fi
+
+package="generated"
+file_name="$tap_stream".go
+output_path="$plugin_path/models/generated/$file_name"
+
+tmp_dir=$(mktemp -d -t schema-XXXXX)
+
+json_schema_path="$tmp_dir"/"$tap_stream"
+
+# add, as necessary, more elif blocks for additional transformations
+modified_schema=$(cat "$json_path" |  jq --argjson tf "$time_format" '
+    .streams[] |
+    select(.stream=="'"$tap_stream"'").schema |
+      . += { "$schema": "http://json-schema.org/draft-07/schema#" } |
+      walk(
+        if type == "object" and .format == "date-time" then
+          . += { "goJSONSchema": ($tf) }
+        elif "place_holder" == "" then
+          empty
+        else . end
+      )
+')
+
+# additional cleanup
+modified_schema=$(echo "$modified_schema" | sed -r "/\"null\",/d")
+modified_schema=$(echo "$modified_schema" | sed -r "/.*additionalProperties.*/d")
+
+echo "$modified_schema" > "$json_schema_path" &&\
+
+# see output
+cat "$json_schema_path" | jq -r
+
+exitcode=$?
+if [ $exitcode != 0 ]; then
+  exit $exitcode
+fi
+
+gojsonschema -v -p "$package" "$json_schema_path" -o "$output_path"
+
+echo "$output_path"
+
+# prepend the license text to the generated files
+cp "$output_path" "$output_path".bak
+license_header="$(printf "/*\n%s\n/*\n" "$(cat .golangci-goheader.template)")"
+echo "$license_header" > "$output_path"
+cat "$output_path".bak >> "$output_path"
+rm "$output_path".bak
\ No newline at end of file
diff --git a/utils/ipc.go b/utils/ipc.go
new file mode 100644
index 00000000..efad7509
--- /dev/null
+++ b/utils/ipc.go
@@ -0,0 +1,114 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package utils
+
+import (
+	"bufio"
+	"fmt"
+	"github.com/apache/incubator-devlake/errors"
+	"os"
+	"os/exec"
+	"strings"
+	"sync"
+)
+
+// ProcessResponse wraps output of a process
+type ProcessResponse[T any] struct {
+	Data T
+	Err  error
+}
+
+// RunProcess runs the cmd and returns its raw standard output. This is a blocking function.
+func RunProcess(cmd *exec.Cmd) (*ProcessResponse[[]byte], error) {
+	cmd.Env = append(cmd.Env, os.Environ()...)
+	stderr, err := cmd.StderrPipe()
+	if err != nil {
+		return nil, err
+	}
+	remoteErrorMsg := &strings.Builder{}
+	go func() {
+		scanner := bufio.NewScanner(stderr)
+		scanner.Split(bufio.ScanLines)
+		for scanner.Scan() {
+			_, _ = remoteErrorMsg.Write(scanner.Bytes())
+			_, _ = remoteErrorMsg.WriteString("\n")
+		}
+	}()
+	output, err := cmd.Output()
+	if err != nil {
+		return nil, errors.Default.Wrap(err, fmt.Sprintf("remote error message:\n%s", remoteErrorMsg.String()))
+	}
+	return &ProcessResponse[[]byte]{
+		Data: output,
+	}, nil
+}
+
+// StreamProcess runs the cmd and returns its standard output on a line-by-line basis, on a channel. The converter functor will allow you
+// to convert the incoming raw to your custom data type T. This is a nonblocking function.
+func StreamProcess[T any](cmd *exec.Cmd, converter func(b []byte) (T, error)) (<-chan *ProcessResponse[T], error) {
+	cmd.Env = append(cmd.Env, os.Environ()...)
+	stdout, err := cmd.StdoutPipe()
+	if err != nil {
+		return nil, err
+	}
+	stderr, err := cmd.StderrPipe()
+	if err != nil {
+		return nil, err
+	}
+	if err = cmd.Start(); err != nil {
+		return nil, err
+	}
+	stream := make(chan *ProcessResponse[T], 32)
+	wg := &sync.WaitGroup{}
+	wg.Add(2)
+	go func() {
+		scanner := bufio.NewScanner(stdout)
+		scanner.Split(bufio.ScanLines)
+		for scanner.Scan() {
+			src := scanner.Bytes()
+			data := make([]byte, len(src))
+			copy(data, src)
+			if result, err := converter(data); err != nil {
+				stream <- &ProcessResponse[T]{Err: err}
+			} else {
+				stream <- &ProcessResponse[T]{Data: result}
+			}
+		}
+		wg.Done()
+	}()
+	remoteErrorMsg := &strings.Builder{}
+	go func() {
+		scanner := bufio.NewScanner(stderr)
+		scanner.Split(bufio.ScanLines)
+		for scanner.Scan() {
+			_, _ = remoteErrorMsg.Write(scanner.Bytes())
+			_, _ = remoteErrorMsg.WriteString("\n")
+		}
+	}()
+	go func() {
+		if err = cmd.Wait(); err != nil {
+			stream <- &ProcessResponse[T]{Err: errors.Default.Wrap(err, fmt.Sprintf("remote error response:\n%s", remoteErrorMsg))}
+		}
+		wg.Done()
+	}()
+	go func() {
+		defer close(stream)
+		wg.Wait()
+	}()
+	return stream, nil
+}