You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2022/12/28 17:55:59 UTC

[beam] branch master updated: [Go SDK]: MongoDB IO connector (#24663)

This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e763ad6d18 [Go SDK]: MongoDB IO connector (#24663)
1e763ad6d18 is described below

commit 1e763ad6d18f80de21d2db0d637cc7e8800d45ab
Author: Johanna Öjeling <51...@users.noreply.github.com>
AuthorDate: Wed Dec 28 18:55:47 2022 +0100

    [Go SDK]: MongoDB IO connector (#24663)
---
 CHANGES.md                                         |   1 +
 sdks/go.mod                                        |   9 +-
 sdks/go.sum                                        |  23 +-
 sdks/go/pkg/beam/io/mongodbio/coder.go             |  68 +++
 sdks/go/pkg/beam/io/mongodbio/coder_test.go        | 160 +++++++
 sdks/go/pkg/beam/io/mongodbio/common.go            |  73 +++
 sdks/go/pkg/beam/io/mongodbio/example_test.go      | 180 ++++++++
 sdks/go/pkg/beam/io/mongodbio/helper_test.go       |  33 ++
 sdks/go/pkg/beam/io/mongodbio/read.go              | 492 +++++++++++++++++++++
 sdks/go/pkg/beam/io/mongodbio/read_option.go       |  60 +++
 sdks/go/pkg/beam/io/mongodbio/read_option_test.go  | 115 +++++
 sdks/go/pkg/beam/io/mongodbio/read_test.go         | 393 ++++++++++++++++
 sdks/go/pkg/beam/io/mongodbio/write.go             | 204 +++++++++
 sdks/go/pkg/beam/io/mongodbio/write_option.go      |  50 +++
 sdks/go/pkg/beam/io/mongodbio/write_option_test.go |  83 ++++
 sdks/go/pkg/beam/io/mongodbio/write_test.go        |  54 +++
 sdks/go/pkg/beam/util/structx/struct.go            |  19 +
 sdks/go/pkg/beam/util/structx/struct_test.go       |  60 +++
 sdks/go/test/integration/integration.go            |   1 +
 .../integration/internal/containers/containers.go  |  82 ++++
 .../test/integration/io/mongodbio/helper_test.go   | 121 +++++
 .../integration/io/mongodbio/mongodbio_test.go     | 237 ++++++++++
 22 files changed, 2516 insertions(+), 2 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 5bbfebec6b1..e856ef18dab 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -59,6 +59,7 @@
 ## I/Os
 
 * Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
+* MongoDB IO connector added (Go) ([#24575](https://github.com/apache/beam/issues/24575)).
 
 ## New Features / Improvements
 
diff --git a/sdks/go.mod b/sdks/go.mod
index c7ef57eedbe..e9d206ee49b 100644
--- a/sdks/go.mod
+++ b/sdks/go.mod
@@ -48,6 +48,7 @@ require (
 	github.com/testcontainers/testcontainers-go v0.15.0
 	github.com/xitongsys/parquet-go v1.6.2
 	github.com/xitongsys/parquet-go-source v0.0.0-20220315005136-aec0fe3e777c
+	go.mongodb.org/mongo-driver v1.11.1
 	golang.org/x/net v0.4.0
 	golang.org/x/oauth2 v0.0.0-20221014153046-6fdb5e3db783
 	golang.org/x/sync v0.1.0
@@ -113,11 +114,12 @@ require (
 	github.com/googleapis/gax-go/v2 v2.7.0 // indirect
 	github.com/inconshreveable/mousetrap v1.0.1 // indirect
 	github.com/jmespath/go-jmespath v0.4.0 // indirect
-	github.com/klauspost/compress v1.13.1 // indirect
+	github.com/klauspost/compress v1.13.6 // indirect
 	github.com/magiconair/properties v1.8.6 // indirect
 	github.com/moby/sys/mount v0.3.3 // indirect
 	github.com/moby/sys/mountinfo v0.6.2 // indirect
 	github.com/moby/term v0.0.0-20210619224110-3f7ff695adc6 // indirect
+	github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect
 	github.com/morikuni/aec v1.0.0 // indirect
 	github.com/opencontainers/go-digest v1.0.0 // indirect
 	github.com/opencontainers/image-spec v1.0.3-0.20211202183452-c5a74bcca799 // indirect
@@ -128,7 +130,12 @@ require (
 	github.com/shabbyrobe/gocovmerge v0.0.0-20180507124511-f6ea450bfb63 // indirect
 	github.com/sirupsen/logrus v1.8.1 // indirect
 	github.com/spf13/pflag v1.0.5 // indirect
+	github.com/xdg-go/pbkdf2 v1.0.0 // indirect
+	github.com/xdg-go/scram v1.1.1 // indirect
+	github.com/xdg-go/stringprep v1.0.3 // indirect
+	github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect
 	go.opencensus.io v0.24.0 // indirect
+	golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d // indirect
 	golang.org/x/tools v0.1.12 // indirect
 	golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
 	google.golang.org/appengine v1.6.7 // indirect
diff --git a/sdks/go.sum b/sdks/go.sum
index 3d7c21812c0..49045a6b937 100644
--- a/sdks/go.sum
+++ b/sdks/go.sum
@@ -469,6 +469,7 @@ github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaS
 github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
 github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
 github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
+github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
 github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
 github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
 github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
@@ -580,8 +581,9 @@ github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+o
 github.com/klauspost/compress v1.9.7/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
 github.com/klauspost/compress v1.11.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
 github.com/klauspost/compress v1.11.13/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
-github.com/klauspost/compress v1.13.1 h1:wXr2uRxZTJXHLly6qhJabee5JqIhTRoLBhDOA74hDEQ=
 github.com/klauspost/compress v1.13.1/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
+github.com/klauspost/compress v1.13.6 h1:P76CopJELS0TiO2mebmnzgWaajssP/EszplttgQxcgc=
+github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
 github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
 github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
 github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
@@ -639,6 +641,8 @@ github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJ
 github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
 github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
 github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
+github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe h1:iruDEfMl2E6fbMZ9s0scYfZQ84/6SPL6zC8ACM2oIL0=
+github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc=
 github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A=
 github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc=
 github.com/mrunalp/fileutils v0.5.0/go.mod h1:M1WthSahJixYnrXQl/DFQuteStB1weuxD2QJNHXfbSQ=
@@ -816,6 +820,8 @@ github.com/testcontainers/testcontainers-go v0.15.0 h1:3Ex7PUGFv0b2bBsdOv6R42+SK
 github.com/testcontainers/testcontainers-go v0.15.0/go.mod h1:PkohMRH2X8Hib0IWtifVexDfLPVT+tb5E9hsf7cW12w=
 github.com/tetratelabs/wazero v1.0.0-pre.4 h1:RBJQT5OzmORkSp6MmZDWoFEr0zXjk4pmvMKAdeUnsaI=
 github.com/tetratelabs/wazero v1.0.0-pre.4/go.mod h1:u8wrFmpdrykiFK0DFPiFm5a4+0RzsdmXYVtijBKqUVo=
+github.com/tidwall/pretty v1.0.0 h1:HsD+QiTn7sK6flMKIvNmpqz1qrpP3Ps6jOKIKMooyg4=
+github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
 github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
 github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
 github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc=
@@ -831,6 +837,12 @@ github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df/go.mod h1:JP3t17
 github.com/vishvananda/netns v0.0.0-20200728191858-db3c7e526aae/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0=
 github.com/willf/bitset v1.1.11-0.20200630133818-d5bec3311243/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4=
 github.com/willf/bitset v1.1.11/go.mod h1:83CECat5yLh5zVOf4P1ErAgKA5UDvKtgyUABdr3+MjI=
+github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c=
+github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
+github.com/xdg-go/scram v1.1.1 h1:VOMT+81stJgXW3CpHyqHN3AXDYIMsx56mEFrB37Mb/E=
+github.com/xdg-go/scram v1.1.1/go.mod h1:RaEWvsqvNKKvBPvcKeFjrG2cJqOkHTiyTpzz23ni57g=
+github.com/xdg-go/stringprep v1.0.3 h1:kdwGpVNwPFtjs98xCGkHjQtGKh86rDcRZN17QEMCOIs=
+github.com/xdg-go/stringprep v1.0.3/go.mod h1:W3f5j4i+9rC0kuIEJL0ky1VpHXQU3ocBgklLGvcBnW8=
 github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU=
 github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ=
 github.com/xeipuuv/gojsonschema v0.0.0-20180618132009-1d523034197f/go.mod h1:5yf86TLmAcydyeJq5YvxkGPE2fm/u4myDekKRoLuqhs=
@@ -843,6 +855,8 @@ github.com/xitongsys/parquet-go-source v0.0.0-20200817004010-026bad9b25d0/go.mod
 github.com/xitongsys/parquet-go-source v0.0.0-20220315005136-aec0fe3e777c h1:UDtocVeACpnwauljUbeHD9UOjjcvF5kLUHruww7VT9A=
 github.com/xitongsys/parquet-go-source v0.0.0-20220315005136-aec0fe3e777c/go.mod h1:qLb2Itmdcp7KPa5KZKvhE9U1q5bYSOmgeOckF/H2rQA=
 github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
+github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d h1:splanxYIlg+5LfHAM6xpdFEAYOk8iySO56hMFq6uLyA=
+github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA=
 github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
 github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
 github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
@@ -854,6 +868,8 @@ go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
 go.etcd.io/bbolt v1.3.5/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ=
 go.etcd.io/bbolt v1.3.6/go.mod h1:qXsaaIqmgQH0T+OPdb99Bf+PKfBBQVAdyD6TY9G8XM4=
 go.etcd.io/etcd v0.5.0-alpha.5.0.20200910180754-dd1b699fc489/go.mod h1:yVHk9ub3CSBatqGNg7GRmsnfLWtoW60w4eDYfh7vHDg=
+go.mongodb.org/mongo-driver v1.11.1 h1:QP0znIRTuL0jf1oBQoAoM0C6ZJfBK4kx0Uumtv1A7w8=
+go.mongodb.org/mongo-driver v1.11.1/go.mod h1:s7p5vEtfbeR1gYi6pnj3c3/urpbLv2T5Sfd6Rp2HBB8=
 go.mozilla.org/pkcs7 v0.0.0-20200128120323-432b2356ecb1/go.mod h1:SNgMg+EgDFwmvSmLRTNKC5fegJjB7v23qTQ0XLGUNHk=
 go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
 go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
@@ -881,6 +897,8 @@ golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de/go.mod h1:LzIPMQfyMNhhGPh
 golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
 golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
 golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8=
+golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d h1:sK3txAijHtOK88l68nt020reeT1ZdKLIYetKl95FzVY=
+golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
 golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
 golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
 golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
@@ -953,6 +971,7 @@ golang.org/x/net v0.0.0-20201224014010-6772e930b67b/go.mod h1:m0MpNAwzfU5UDzcl9v
 golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
 golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
 golang.org/x/net v0.0.0-20210825183410-e898025ed96a/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
+golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
 golang.org/x/net v0.4.0 h1:Q5QPcMlvfxFTAPV0+07Xz/MpK9NTXu2VDUuy0FeMfaU=
 golang.org/x/net v0.4.0/go.mod h1:MBQ8lrhLObU/6UmLb4fmbmk5OcyYmqtbGd/9yIeKjEE=
 golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
@@ -1044,6 +1063,7 @@ golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7w
 golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20210426230700-d19ff857e887/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.0.0-20210906170528-6f6e22806c34/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
@@ -1060,6 +1080,7 @@ golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
 golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
 golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
 golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
+golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
 golang.org/x/text v0.5.0 h1:OLmvp0KP+FVG99Ct/qFiL/Fhk4zp4QQnZ7b2U+5piUM=
 golang.org/x/text v0.5.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
 golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
diff --git a/sdks/go/pkg/beam/io/mongodbio/coder.go b/sdks/go/pkg/beam/io/mongodbio/coder.go
new file mode 100644
index 00000000000..c140f9a8a25
--- /dev/null
+++ b/sdks/go/pkg/beam/io/mongodbio/coder.go
@@ -0,0 +1,68 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package mongodbio
+
+import (
+	"fmt"
+	"reflect"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"go.mongodb.org/mongo-driver/bson"
+	"go.mongodb.org/mongo-driver/bson/primitive"
+)
+
+func init() {
+	beam.RegisterCoder(
+		reflect.TypeOf((*bson.M)(nil)).Elem(),
+		encodeBSONMap,
+		decodeBSONMap,
+	)
+	beam.RegisterCoder(
+		reflect.TypeOf((*primitive.ObjectID)(nil)).Elem(),
+		encodeObjectID,
+		decodeObjectID,
+	)
+}
+
+func encodeBSONMap(m bson.M) ([]byte, error) {
+	bytes, err := bson.Marshal(m)
+	if err != nil {
+		return nil, fmt.Errorf("error encoding BSON: %w", err)
+	}
+
+	return bytes, nil
+}
+
+func decodeBSONMap(bytes []byte) (bson.M, error) {
+	var out bson.M
+	if err := bson.Unmarshal(bytes, &out); err != nil {
+		return nil, fmt.Errorf("error decoding BSON: %w", err)
+	}
+
+	return out, nil
+}
+
+func encodeObjectID(objectID primitive.ObjectID) []byte {
+	return objectID[:]
+}
+
+func decodeObjectID(bytes []byte) primitive.ObjectID {
+	var out primitive.ObjectID
+
+	copy(out[:], bytes[:])
+
+	return out
+}
diff --git a/sdks/go/pkg/beam/io/mongodbio/coder_test.go b/sdks/go/pkg/beam/io/mongodbio/coder_test.go
new file mode 100644
index 00000000000..d5e3bb2974d
--- /dev/null
+++ b/sdks/go/pkg/beam/io/mongodbio/coder_test.go
@@ -0,0 +1,160 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package mongodbio
+
+import (
+	"testing"
+
+	"github.com/google/go-cmp/cmp"
+	"go.mongodb.org/mongo-driver/bson"
+	"go.mongodb.org/mongo-driver/bson/primitive"
+)
+
+func Test_encodeBSONMap(t *testing.T) {
+	tests := []struct {
+		name    string
+		m       bson.M
+		want    []byte
+		wantErr bool
+	}{
+		{
+			name:    "Encode bson.M",
+			m:       bson.M{"key": "val"},
+			want:    []byte{18, 0, 0, 0, 2, 107, 101, 121, 0, 4, 0, 0, 0, 118, 97, 108, 0, 0},
+			wantErr: false,
+		},
+		{
+			name:    "Encode empty bson.M",
+			m:       bson.M{},
+			want:    []byte{5, 0, 0, 0, 0},
+			wantErr: false,
+		},
+		{
+			name:    "Encode nil bson.M",
+			m:       bson.M(nil),
+			want:    []byte{5, 0, 0, 0, 0},
+			wantErr: false,
+		},
+		{
+			name:    "Error - invalid bson.M",
+			m:       bson.M{"key": make(chan int)},
+			wantErr: true,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			got, err := encodeBSONMap(tt.m)
+			if (err != nil) != tt.wantErr {
+				t.Fatalf("encodeBSONMap() error = %v, wantErr %v", err, tt.wantErr)
+			}
+
+			if !cmp.Equal(got, tt.want) {
+				t.Errorf("encodeBSONMap() got = %v, want %v", got, tt.want)
+			}
+		})
+	}
+}
+
+func Test_decodeBSONMap(t *testing.T) {
+	tests := []struct {
+		name    string
+		bytes   []byte
+		want    bson.M
+		wantErr bool
+	}{
+		{
+			name:    "Decode bson.M",
+			bytes:   []byte{18, 0, 0, 0, 2, 107, 101, 121, 0, 4, 0, 0, 0, 118, 97, 108, 0, 0},
+			want:    bson.M{"key": "val"},
+			wantErr: false,
+		},
+		{
+			name:    "Decode empty bson.M",
+			bytes:   []byte{5, 0, 0, 0, 0},
+			want:    bson.M{},
+			wantErr: false,
+		},
+		{
+			name:    "Error - invalid bson.M",
+			bytes:   []byte{},
+			wantErr: true,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			got, err := decodeBSONMap(tt.bytes)
+			if (err != nil) != tt.wantErr {
+				t.Fatalf("decodeBSONMap() error = %v, wantErr %v", err, tt.wantErr)
+			}
+
+			if !cmp.Equal(got, tt.want) {
+				t.Errorf("decodeBSONMap() got = %v, want %v", got, tt.want)
+			}
+		})
+	}
+}
+
+func Test_encodeObjectID(t *testing.T) {
+	tests := []struct {
+		name     string
+		objectID primitive.ObjectID
+		want     []byte
+	}{
+		{
+			name:     "Encode object ID",
+			objectID: objectIDFromHex(t, "5f1b2c3d4e5f60708090a0b0"),
+			want:     []byte{95, 27, 44, 61, 78, 95, 96, 112, 128, 144, 160, 176},
+		},
+		{
+			name:     "Encode nil object ID",
+			objectID: primitive.NilObjectID,
+			want:     []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			if got := encodeObjectID(tt.objectID); !cmp.Equal(got, tt.want) {
+				t.Errorf("encodeObjectID() = %v, want %v", got, tt.want)
+			}
+		})
+	}
+}
+
+func Test_decodeObjectID(t *testing.T) {
+	tests := []struct {
+		name  string
+		bytes []byte
+		want  primitive.ObjectID
+	}{
+		{
+			name:  "Decode object ID",
+			bytes: []byte{95, 27, 44, 61, 78, 95, 96, 112, 128, 144, 160, 176},
+			want:  objectIDFromHex(t, "5f1b2c3d4e5f60708090a0b0"),
+		},
+		{
+			name:  "Decode nil object ID",
+			bytes: []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
+			want:  primitive.NilObjectID,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			if got := decodeObjectID(tt.bytes); !cmp.Equal(got, tt.want) {
+				t.Errorf("decodeObjectID() = %v, want %v", got, tt.want)
+			}
+		})
+	}
+}
diff --git a/sdks/go/pkg/beam/io/mongodbio/common.go b/sdks/go/pkg/beam/io/mongodbio/common.go
new file mode 100644
index 00000000000..9d6ffbeaa95
--- /dev/null
+++ b/sdks/go/pkg/beam/io/mongodbio/common.go
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package mongodbio contains transforms for reading from and writing to MongoDB.
+package mongodbio
+
+import (
+	"context"
+	"fmt"
+
+	"go.mongodb.org/mongo-driver/mongo"
+	"go.mongodb.org/mongo-driver/mongo/options"
+	"go.mongodb.org/mongo-driver/mongo/readpref"
+)
+
+const (
+	bsonTag = "bson"
+)
+
+type mongoDBFn struct {
+	URI        string
+	Database   string
+	Collection string
+	client     *mongo.Client
+	collection *mongo.Collection
+}
+
+func (fn *mongoDBFn) Setup(ctx context.Context) error {
+	client, err := newClient(ctx, fn.URI)
+	if err != nil {
+		return err
+	}
+
+	fn.client = client
+	fn.collection = client.Database(fn.Database).Collection(fn.Collection)
+
+	return nil
+}
+
+func newClient(ctx context.Context, uri string) (*mongo.Client, error) {
+	opts := options.Client().ApplyURI(uri)
+
+	client, err := mongo.Connect(ctx, opts)
+	if err != nil {
+		return nil, fmt.Errorf("error connecting to MongoDB: %w", err)
+	}
+
+	if err := client.Ping(ctx, readpref.Primary()); err != nil {
+		return nil, fmt.Errorf("error pinging MongoDB: %w", err)
+	}
+
+	return client, nil
+}
+
+func (fn *mongoDBFn) Teardown(ctx context.Context) error {
+	if err := fn.client.Disconnect(ctx); err != nil {
+		return fmt.Errorf("error disconnecting from MongoDB: %w", err)
+	}
+
+	return nil
+}
diff --git a/sdks/go/pkg/beam/io/mongodbio/example_test.go b/sdks/go/pkg/beam/io/mongodbio/example_test.go
new file mode 100644
index 00000000000..3e77303843e
--- /dev/null
+++ b/sdks/go/pkg/beam/io/mongodbio/example_test.go
@@ -0,0 +1,180 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package mongodbio_test
+
+import (
+	"context"
+	"log"
+	"reflect"
+	"time"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/io/mongodbio"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug"
+	"go.mongodb.org/mongo-driver/bson"
+	"go.mongodb.org/mongo-driver/bson/primitive"
+)
+
+func ExampleRead_default() {
+	type Event struct {
+		ID        primitive.ObjectID `bson:"_id"`
+		Timestamp int64              `bson:"timestamp"`
+		EventType int32              `bson:"event_type"`
+	}
+
+	beam.Init()
+	p, s := beam.NewPipelineWithRoot()
+
+	col := mongodbio.Read(
+		s,
+		"mongodb://localhost:27017",
+		"demo",
+		"events",
+		reflect.TypeOf(Event{}),
+	)
+	debug.Print(s, col)
+
+	if err := beamx.Run(context.Background(), p); err != nil {
+		log.Fatalf("Failed to execute job: %v", err)
+	}
+}
+
+func ExampleRead_options() {
+	type Event struct {
+		ID        primitive.ObjectID `bson:"_id"`
+		Timestamp int64              `bson:"timestamp"`
+		EventType int32              `bson:"event_type"`
+	}
+
+	beam.Init()
+	p, s := beam.NewPipelineWithRoot()
+
+	col := mongodbio.Read(
+		s,
+		"mongodb://localhost:27017",
+		"demo",
+		"events",
+		reflect.TypeOf(Event{}),
+		mongodbio.WithReadBucketAuto(true),
+		mongodbio.WithReadBundleSize(32*1024*1024),
+		mongodbio.WithReadFilter(bson.M{"timestamp": bson.M{"$gt": 1640995200000}}),
+	)
+	debug.Print(s, col)
+
+	if err := beamx.Run(context.Background(), p); err != nil {
+		log.Fatalf("Failed to execute job: %v", err)
+	}
+}
+
+func ExampleWrite_default() {
+	type Event struct {
+		ID        primitive.ObjectID `bson:"_id"`
+		Timestamp int64              `bson:"timestamp"`
+		EventType int32              `bson:"event_type"`
+	}
+
+	beam.Init()
+	p, s := beam.NewPipelineWithRoot()
+
+	input := []Event{
+		{
+			ID:        primitive.NewObjectIDFromTimestamp(time.UnixMilli(1640995200001)),
+			Timestamp: 1640995200001,
+			EventType: 1,
+		},
+		{
+			ID:        primitive.NewObjectIDFromTimestamp(time.UnixMilli(1640995200002)),
+			Timestamp: 1640995200002,
+			EventType: 2,
+		},
+	}
+
+	col := beam.CreateList(s, input)
+	mongodbio.Write(s, "mongodb://localhost:27017", "demo", "events", col)
+
+	if err := beamx.Run(context.Background(), p); err != nil {
+		log.Fatalf("Failed to execute job: %v", err)
+	}
+}
+
+func ExampleWrite_options() {
+	type Event struct {
+		ID        primitive.ObjectID `bson:"_id"`
+		Timestamp int64              `bson:"timestamp"`
+		EventType int32              `bson:"event_type"`
+	}
+
+	beam.Init()
+	p, s := beam.NewPipelineWithRoot()
+
+	input := []Event{
+		{
+			ID:        primitive.NewObjectIDFromTimestamp(time.UnixMilli(1640995200001)),
+			Timestamp: 1640995200001,
+			EventType: 1,
+		},
+		{
+			ID:        primitive.NewObjectIDFromTimestamp(time.UnixMilli(1640995200002)),
+			Timestamp: 1640995200002,
+			EventType: 2,
+		},
+	}
+
+	col := beam.CreateList(s, input)
+	mongodbio.Write(
+		s,
+		"mongodb://localhost:27017",
+		"demo",
+		"events",
+		col,
+		mongodbio.WithWriteBatchSize(500),
+		mongodbio.WithWriteOrdered(false),
+	)
+
+	if err := beamx.Run(context.Background(), p); err != nil {
+		log.Fatalf("Failed to execute job: %v", err)
+	}
+}
+
+func ExampleWrite_generateID() {
+	type Event struct {
+		Timestamp int64 `bson:"timestamp"`
+		EventType int32 `bson:"event_type"`
+	}
+
+	beam.Init()
+	p, s := beam.NewPipelineWithRoot()
+
+	input := []Event{
+		{
+			Timestamp: 1640995200001,
+			EventType: 1,
+		},
+		{
+			Timestamp: 1640995200002,
+			EventType: 1,
+		},
+	}
+
+	col := beam.CreateList(s, input)
+	ids := mongodbio.Write(s, "mongodb://localhost:27017", "demo", "events", col)
+	debug.Print(s, ids)
+
+	if err := beamx.Run(context.Background(), p); err != nil {
+		log.Fatalf("Failed to execute job: %v", err)
+	}
+}
diff --git a/sdks/go/pkg/beam/io/mongodbio/helper_test.go b/sdks/go/pkg/beam/io/mongodbio/helper_test.go
new file mode 100644
index 00000000000..c5a63c15adb
--- /dev/null
+++ b/sdks/go/pkg/beam/io/mongodbio/helper_test.go
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package mongodbio
+
+import (
+	"testing"
+
+	"go.mongodb.org/mongo-driver/bson/primitive"
+)
+
+func objectIDFromHex(t *testing.T, hex string) primitive.ObjectID {
+	t.Helper()
+
+	id, err := primitive.ObjectIDFromHex(hex)
+	if err != nil {
+		t.Fatalf("error parsing hex string to primitive.ObjectID: %v", err)
+	}
+
+	return id
+}
diff --git a/sdks/go/pkg/beam/io/mongodbio/read.go b/sdks/go/pkg/beam/io/mongodbio/read.go
new file mode 100644
index 00000000000..e09f2f1e1af
--- /dev/null
+++ b/sdks/go/pkg/beam/io/mongodbio/read.go
@@ -0,0 +1,492 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package mongodbio
+
+import (
+	"context"
+	"fmt"
+	"math"
+	"reflect"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/util/structx"
+	"go.mongodb.org/mongo-driver/bson"
+	"go.mongodb.org/mongo-driver/mongo"
+	"go.mongodb.org/mongo-driver/mongo/options"
+	"go.mongodb.org/mongo-driver/mongo/readpref"
+)
+
+const (
+	defaultReadBundleSize = 64 * 1024 * 1024
+
+	minSplitVectorChunkSize = 1024 * 1024
+	maxSplitVectorChunkSize = 1024 * 1024 * 1024
+
+	maxBucketCount = math.MaxInt32
+)
+
+func init() {
+	register.DoFn3x1[context.Context, []byte, func(bson.M), error](&bucketAutoFn{})
+	register.DoFn3x1[context.Context, []byte, func(bson.M), error](&splitVectorFn{})
+	register.Emitter1[bson.M]()
+
+	register.DoFn3x1[context.Context, bson.M, func(beam.Y), error](&readFn{})
+	register.Emitter1[beam.Y]()
+}
+
+// Read reads a MongoDB collection and returns a PCollection<T> for a given type T. T must be a
+// struct with exported fields that should have a "bson" tag. By default, the transform uses the
+// MongoDB internal splitVector command to split the collection into bundles. The transform can be
+// configured to use the $bucketAuto aggregation instead to support reading from MongoDB Atlas
+// where the splitVector command is not allowed. This is enabled by passing the ReadOptionFn
+// WithReadBucketAuto(true).
+//
+// The Read transform has the required parameters:
+//   - s: the scope of the pipeline
+//   - uri: the MongoDB connection string
+//   - database: the MongoDB database to read from
+//   - collection: the MongoDB collection to read from
+//   - t: the type of the elements in the collection
+//
+// The Read transform takes a variadic number of ReadOptionFn which can set the ReadOption fields:
+//   - BucketAuto: whether to use the bucketAuto aggregation to split the collection into bundles.
+//     Defaults to false
+//   - Filter: a bson.M map that is used to filter the documents in the collection. Defaults to nil,
+//     which means no filter is applied
+//   - BundleSize: the size in bytes to bundle the documents into when reading. Defaults to
+//     64 * 1024 * 1024 (64 MB)
+func Read(
+	s beam.Scope,
+	uri string,
+	database string,
+	collection string,
+	t reflect.Type,
+	opts ...ReadOptionFn,
+) beam.PCollection {
+	s = s.Scope("mongodbio.Read")
+
+	option := &ReadOption{
+		BundleSize: defaultReadBundleSize,
+	}
+
+	for _, opt := range opts {
+		if err := opt(option); err != nil {
+			panic(fmt.Sprintf("mongodbio.Read: invalid option: %v", err))
+		}
+	}
+
+	imp := beam.Impulse(s)
+
+	var bundled beam.PCollection
+
+	if option.BucketAuto {
+		bundled = beam.ParDo(s, newBucketAutoFn(uri, database, collection, option), imp)
+	} else {
+		bundled = beam.ParDo(s, newSplitVectorFn(uri, database, collection, option), imp)
+	}
+
+	return beam.ParDo(
+		s,
+		newReadFn(uri, database, collection, t, option),
+		bundled,
+		beam.TypeDefinition{Var: beam.YType, T: t},
+	)
+}
+
+type bucketAutoFn struct {
+	mongoDBFn
+	BundleSize int64
+}
+
+func newBucketAutoFn(
+	uri string,
+	database string,
+	collection string,
+	option *ReadOption,
+) *bucketAutoFn {
+	return &bucketAutoFn{
+		mongoDBFn: mongoDBFn{
+			URI:        uri,
+			Database:   database,
+			Collection: collection,
+		},
+		BundleSize: option.BundleSize,
+	}
+}
+
+func (fn *bucketAutoFn) ProcessElement(
+	ctx context.Context,
+	_ []byte,
+	emit func(bson.M),
+) error {
+	collectionSize, err := fn.getCollectionSize(ctx)
+	if err != nil {
+		return err
+	}
+
+	if collectionSize == 0 {
+		return nil
+	}
+
+	bucketCount := calculateBucketCount(collectionSize, fn.BundleSize)
+
+	buckets, err := fn.getBuckets(ctx, bucketCount)
+	if err != nil {
+		return err
+	}
+
+	idFilters := idFiltersFromBuckets(buckets)
+
+	for _, filter := range idFilters {
+		emit(filter)
+	}
+
+	return nil
+}
+
+type collStats struct {
+	Size int64 `bson:"size"`
+}
+
+func (fn *bucketAutoFn) getCollectionSize(ctx context.Context) (int64, error) {
+	cmd := bson.M{"collStats": fn.Collection}
+	opts := options.RunCmd().SetReadPreference(readpref.Primary())
+
+	var stats collStats
+	if err := fn.collection.Database().RunCommand(ctx, cmd, opts).Decode(&stats); err != nil {
+		return 0, fmt.Errorf("error executing collStats command: %w", err)
+	}
+
+	return stats.Size, nil
+}
+
+func calculateBucketCount(collectionSize int64, bundleSize int64) int32 {
+	if bundleSize < 0 {
+		panic("monogdbio.calculateBucketCount: bundle size must be greater than 0")
+	}
+
+	count := collectionSize / bundleSize
+	if collectionSize%bundleSize != 0 {
+		count++
+	}
+
+	if count > int64(maxBucketCount) {
+		count = maxBucketCount
+	}
+
+	return int32(count)
+}
+
+type bucket struct {
+	ID minMax `bson:"_id"`
+}
+
+type minMax struct {
+	Min any `bson:"min"`
+	Max any `bson:"max"`
+}
+
+func (fn *bucketAutoFn) getBuckets(ctx context.Context, count int32) ([]bucket, error) {
+	pipeline := mongo.Pipeline{bson.D{{
+		Key: "$bucketAuto",
+		Value: bson.M{
+			"groupBy": "$_id",
+			"buckets": count,
+		},
+	}}}
+
+	cursor, err := fn.collection.Aggregate(ctx, pipeline)
+	if err != nil {
+		return nil, fmt.Errorf("error executing bucketAuto aggregation: %w", err)
+	}
+
+	var buckets []bucket
+	if err = cursor.All(ctx, &buckets); err != nil {
+		return nil, fmt.Errorf("error decoding buckets: %w", err)
+	}
+
+	return buckets, nil
+}
+
+func idFiltersFromBuckets(buckets []bucket) []bson.M {
+	idFilters := make([]bson.M, len(buckets))
+
+	for i := 0; i < len(buckets); i++ {
+		filter := bson.M{}
+
+		if i != 0 {
+			filter["$gt"] = buckets[i].ID.Min
+		}
+
+		if i != len(buckets)-1 {
+			filter["$lte"] = buckets[i].ID.Max
+		}
+
+		if len(filter) == 0 {
+			idFilters[i] = filter
+		} else {
+			idFilters[i] = bson.M{"_id": filter}
+		}
+	}
+
+	return idFilters
+}
+
+type splitVectorFn struct {
+	mongoDBFn
+	BundleSize int64
+}
+
+func newSplitVectorFn(
+	uri string,
+	database string,
+	collection string,
+	option *ReadOption,
+) *splitVectorFn {
+	return &splitVectorFn{
+		mongoDBFn: mongoDBFn{
+			URI:        uri,
+			Database:   database,
+			Collection: collection,
+		},
+		BundleSize: option.BundleSize,
+	}
+}
+
+func (fn *splitVectorFn) ProcessElement(
+	ctx context.Context,
+	_ []byte,
+	emit func(bson.M),
+) error {
+	chunkSize := getChunkSize(fn.BundleSize)
+
+	splitKeys, err := fn.getSplitKeys(ctx, chunkSize)
+	if err != nil {
+		return err
+	}
+
+	idFilters := idFiltersFromSplits(splitKeys)
+
+	for _, filter := range idFilters {
+		emit(filter)
+	}
+
+	return nil
+}
+
+func getChunkSize(bundleSize int64) int64 {
+	var chunkSize int64
+
+	if bundleSize < minSplitVectorChunkSize {
+		chunkSize = minSplitVectorChunkSize
+	} else if bundleSize > maxSplitVectorChunkSize {
+		chunkSize = maxSplitVectorChunkSize
+	} else {
+		chunkSize = bundleSize
+	}
+
+	return chunkSize
+}
+
+type splitVector struct {
+	SplitKeys []splitKey `bson:"splitKeys"`
+}
+
+type splitKey struct {
+	ID any `bson:"_id"`
+}
+
+func (fn *splitVectorFn) getSplitKeys(ctx context.Context, chunkSize int64) ([]splitKey, error) {
+	cmd := bson.D{
+		{Key: "splitVector", Value: fmt.Sprintf("%s.%s", fn.Database, fn.Collection)},
+		{Key: "keyPattern", Value: bson.D{{Key: "_id", Value: 1}}},
+		{Key: "maxChunkSizeBytes", Value: chunkSize},
+	}
+
+	opts := options.RunCmd().SetReadPreference(readpref.Primary())
+
+	var vector splitVector
+	if err := fn.collection.Database().RunCommand(ctx, cmd, opts).Decode(&vector); err != nil {
+		return nil, fmt.Errorf("error executing splitVector command: %w", err)
+	}
+
+	return vector.SplitKeys, nil
+}
+
+func idFiltersFromSplits(splitKeys []splitKey) []bson.M {
+	idFilters := make([]bson.M, len(splitKeys)+1)
+
+	for i := 0; i < len(splitKeys)+1; i++ {
+		filter := bson.M{}
+
+		if i > 0 {
+			filter["$gt"] = splitKeys[i-1].ID
+		}
+
+		if i < len(splitKeys) {
+			filter["$lte"] = splitKeys[i].ID
+		}
+
+		if len(filter) == 0 {
+			idFilters[i] = filter
+		} else {
+			idFilters[i] = bson.M{"_id": filter}
+		}
+	}
+
+	return idFilters
+}
+
+type readFn struct {
+	mongoDBFn
+	Filter     []byte
+	Type       beam.EncodedType
+	projection bson.D
+	filter     bson.M
+}
+
+func newReadFn(
+	uri string,
+	database string,
+	collection string,
+	t reflect.Type,
+	option *ReadOption,
+) *readFn {
+	filter, err := encodeBSONMap(option.Filter)
+	if err != nil {
+		panic(fmt.Sprintf("mongodbio.newReadFn: %v", err))
+	}
+
+	return &readFn{
+		mongoDBFn: mongoDBFn{
+			URI:        uri,
+			Database:   database,
+			Collection: collection,
+		},
+		Filter: filter,
+		Type:   beam.EncodedType{T: t},
+	}
+}
+
+func (fn *readFn) Setup(ctx context.Context) error {
+	if err := fn.mongoDBFn.Setup(ctx); err != nil {
+		return err
+	}
+
+	filter, err := decodeBSONMap(fn.Filter)
+	if err != nil {
+		return err
+	}
+
+	fn.filter = filter
+	fn.projection = inferProjection(fn.Type.T, bsonTag)
+
+	return nil
+}
+
+func inferProjection(t reflect.Type, tagKey string) bson.D {
+	names := structx.InferFieldNames(t, tagKey)
+	if len(names) == 0 {
+		panic("mongodbio.inferProjection: no names to infer projection from")
+	}
+
+	projection := make(bson.D, len(names))
+
+	for i, name := range names {
+		projection[i] = bson.E{Key: name, Value: 1}
+	}
+
+	return projection
+}
+
+func (fn *readFn) ProcessElement(
+	ctx context.Context,
+	elem bson.M,
+	emit func(beam.Y),
+) (err error) {
+	mergedFilter := mergeFilters(elem, fn.filter)
+
+	cursor, err := fn.findDocuments(ctx, fn.projection, mergedFilter)
+	if err != nil {
+		return err
+	}
+
+	defer func() {
+		closeErr := cursor.Close(ctx)
+
+		if err != nil {
+			if closeErr != nil {
+				log.Errorf(ctx, "error closing cursor: %v", closeErr)
+			}
+			return
+		}
+
+		err = closeErr
+	}()
+
+	for cursor.Next(ctx) {
+		value, err := decodeDocument(cursor, fn.Type.T)
+		if err != nil {
+			return err
+		}
+
+		emit(value)
+	}
+
+	return cursor.Err()
+}
+
+func mergeFilters(idFilter bson.M, customFilter bson.M) bson.M {
+	if len(idFilter) == 0 {
+		return customFilter
+	}
+
+	if len(customFilter) == 0 {
+		return idFilter
+	}
+
+	return bson.M{
+		"$and": []bson.M{idFilter, customFilter},
+	}
+}
+
+func (fn *readFn) findDocuments(
+	ctx context.Context,
+	projection bson.D,
+	filter bson.M,
+) (*mongo.Cursor, error) {
+	opts := options.Find().SetProjection(projection).SetAllowDiskUse(true)
+
+	cursor, err := fn.collection.Find(ctx, filter, opts)
+	if err != nil {
+		return nil, fmt.Errorf("error finding documents: %w", err)
+	}
+
+	return cursor, nil
+}
+
+func decodeDocument(cursor *mongo.Cursor, t reflect.Type) (any, error) {
+	out := reflect.New(t).Interface()
+	if err := cursor.Decode(out); err != nil {
+		return nil, fmt.Errorf("error decoding document: %w", err)
+	}
+
+	value := reflect.ValueOf(out).Elem().Interface()
+
+	return value, nil
+}
diff --git a/sdks/go/pkg/beam/io/mongodbio/read_option.go b/sdks/go/pkg/beam/io/mongodbio/read_option.go
new file mode 100644
index 00000000000..b724c306a81
--- /dev/null
+++ b/sdks/go/pkg/beam/io/mongodbio/read_option.go
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package mongodbio
+
+import (
+	"errors"
+
+	"go.mongodb.org/mongo-driver/bson"
+)
+
+// ReadOption represents options for reading from MongoDB.
+type ReadOption struct {
+	BucketAuto bool
+	Filter     bson.M
+	BundleSize int64
+}
+
+// ReadOptionFn is a function that configures a ReadOption.
+type ReadOptionFn func(option *ReadOption) error
+
+// WithReadBucketAuto configures the ReadOption whether to use the bucketAuto aggregation stage.
+func WithReadBucketAuto(bucketAuto bool) ReadOptionFn {
+	return func(o *ReadOption) error {
+		o.BucketAuto = bucketAuto
+		return nil
+	}
+}
+
+// WithReadFilter configures the ReadOption to use the provided filter.
+func WithReadFilter(filter bson.M) ReadOptionFn {
+	return func(o *ReadOption) error {
+		o.Filter = filter
+		return nil
+	}
+}
+
+// WithReadBundleSize configures the ReadOption to use the provided bundle size in bytes.
+func WithReadBundleSize(bundleSize int64) ReadOptionFn {
+	return func(o *ReadOption) error {
+		if bundleSize <= 0 {
+			return errors.New("bundle size must be greater than 0")
+		}
+
+		o.BundleSize = bundleSize
+		return nil
+	}
+}
diff --git a/sdks/go/pkg/beam/io/mongodbio/read_option_test.go b/sdks/go/pkg/beam/io/mongodbio/read_option_test.go
new file mode 100644
index 00000000000..d4fe4dfa63a
--- /dev/null
+++ b/sdks/go/pkg/beam/io/mongodbio/read_option_test.go
@@ -0,0 +1,115 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package mongodbio
+
+import (
+	"testing"
+
+	"github.com/google/go-cmp/cmp"
+	"go.mongodb.org/mongo-driver/bson"
+)
+
+func TestWithReadBucketAuto(t *testing.T) {
+	tests := []struct {
+		name       string
+		bucketAuto bool
+		want       bool
+		wantErr    bool
+	}{
+		{
+			name:       "Set bucket auto to true",
+			bucketAuto: true,
+			want:       true,
+			wantErr:    false,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			var option ReadOption
+
+			if err := WithReadBucketAuto(tt.bucketAuto)(&option); (err != nil) != tt.wantErr {
+				t.Fatalf("WithReadBucketAuto() error = %v, wantErr %v", err, tt.wantErr)
+			}
+
+			if option.BucketAuto != tt.want {
+				t.Errorf("option.BucketAuto = %v, want %v", option.BucketAuto, tt.want)
+			}
+		})
+	}
+}
+
+func TestWithReadFilter(t *testing.T) {
+	tests := []struct {
+		name    string
+		filter  bson.M
+		want    bson.M
+		wantErr bool
+	}{
+		{
+			name:    "Set filter to {\"key\": \"value\"}",
+			filter:  bson.M{"key": "value"},
+			want:    bson.M{"key": "value"},
+			wantErr: false,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			var option ReadOption
+
+			if err := WithReadFilter(tt.filter)(&option); (err != nil) != tt.wantErr {
+				t.Fatalf("WithReadFilter() error = %v, wantErr %v", err, tt.wantErr)
+			}
+
+			if !cmp.Equal(option.Filter, tt.want) {
+				t.Errorf("option.Filter = %v, want %v", option.Filter, tt.want)
+			}
+		})
+	}
+}
+
+func TestWithReadBundleSize(t *testing.T) {
+	tests := []struct {
+		name       string
+		bundleSize int64
+		want       int64
+		wantErr    bool
+	}{
+		{
+			name:       "Set bundle size to 1024",
+			bundleSize: 1024,
+			want:       1024,
+			wantErr:    false,
+		},
+		{
+			name:       "Error - bundle size must be greater than 0",
+			bundleSize: 0,
+			wantErr:    true,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			var option ReadOption
+
+			if err := WithReadBundleSize(tt.bundleSize)(&option); (err != nil) != tt.wantErr {
+				t.Fatalf("WithReadBundleSize() error = %v, wantErr %v", err, tt.wantErr)
+			}
+
+			if option.BundleSize != tt.want {
+				t.Errorf("option.BundleSize = %v, want %v", option.BundleSize, tt.want)
+			}
+		})
+	}
+}
diff --git a/sdks/go/pkg/beam/io/mongodbio/read_test.go b/sdks/go/pkg/beam/io/mongodbio/read_test.go
new file mode 100644
index 00000000000..5899457d5a8
--- /dev/null
+++ b/sdks/go/pkg/beam/io/mongodbio/read_test.go
@@ -0,0 +1,393 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package mongodbio
+
+import (
+	"math"
+	"reflect"
+	"testing"
+
+	"github.com/google/go-cmp/cmp"
+	"go.mongodb.org/mongo-driver/bson"
+)
+
+func Test_calculateBucketCount(t *testing.T) {
+	tests := []struct {
+		name           string
+		collectionSize int64
+		bundleSize     int64
+		want           int32
+	}{
+		{
+			name:           "Return ceiling of collection size / bundle size",
+			collectionSize: 3 * 1024 * 1024,
+			bundleSize:     2 * 1024 * 1024,
+			want:           2,
+		},
+		{
+			name:           "Return max int32 when calculated count is greater than max int32",
+			collectionSize: 1024 * 1024 * 1024 * 1024,
+			bundleSize:     1,
+			want:           math.MaxInt32,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			if got := calculateBucketCount(tt.collectionSize, tt.bundleSize); got != tt.want {
+				t.Errorf("calculateBucketCount() = %v, want %v", got, tt.want)
+			}
+		})
+	}
+}
+
+func Test_calculateBucketCountPanic(t *testing.T) {
+	t.Run("Panic when bundleSize is not greater than 0", func(t *testing.T) {
+		defer func() {
+			if r := recover(); r == nil {
+				t.Errorf("calculateBucketCount() does not panic")
+			}
+		}()
+
+		calculateBucketCount(1024, 0)
+	})
+}
+
+func Test_idFiltersFromBuckets(t *testing.T) {
+	tests := []struct {
+		name    string
+		buckets []bucket
+		want    []bson.M
+	}{
+		{
+			name: "Create one $lte filter for start range, one $gt filter for end range, and filters with both " +
+				"$lte and $gt for ranges in between when there are three or more bucket elements",
+			buckets: []bucket{
+				{
+					ID: minMax{
+						Min: objectIDFromHex(t, "6384e03f24f854c1a8ce5378"),
+						Max: objectIDFromHex(t, "6384e03f24f854c1a8ce5380"),
+					},
+				},
+				{
+					ID: minMax{
+						Min: objectIDFromHex(t, "6384e03f24f854c1a8ce5380"),
+						Max: objectIDFromHex(t, "6384e03f24f854c1a8ce5382"),
+					},
+				},
+				{
+					ID: minMax{
+						Min: objectIDFromHex(t, "6384e03f24f854c1a8ce5382"),
+						Max: objectIDFromHex(t, "6384e03f24f854c1a8ce5384"),
+					},
+				},
+			},
+			want: []bson.M{
+				{
+					"_id": bson.M{
+						"$lte": objectIDFromHex(t, "6384e03f24f854c1a8ce5380"),
+					},
+				},
+				{
+					"_id": bson.M{
+						"$gt":  objectIDFromHex(t, "6384e03f24f854c1a8ce5380"),
+						"$lte": objectIDFromHex(t, "6384e03f24f854c1a8ce5382"),
+					},
+				},
+				{
+					"_id": bson.M{
+						"$gt": objectIDFromHex(t, "6384e03f24f854c1a8ce5382"),
+					},
+				},
+			},
+		},
+		{
+			name: "Create one $lte filter for start range and one $gt filter for end range when there are two " +
+				"bucket elements",
+			buckets: []bucket{
+				{
+					ID: minMax{
+						Min: objectIDFromHex(t, "6384e03f24f854c1a8ce5378"),
+						Max: objectIDFromHex(t, "6384e03f24f854c1a8ce5380"),
+					},
+				},
+				{
+					ID: minMax{
+						Min: objectIDFromHex(t, "6384e03f24f854c1a8ce5380"),
+						Max: objectIDFromHex(t, "6384e03f24f854c1a8ce5382"),
+					},
+				},
+			},
+			want: []bson.M{
+				{
+					"_id": bson.M{
+						"$lte": objectIDFromHex(t, "6384e03f24f854c1a8ce5380"),
+					},
+				},
+				{
+					"_id": bson.M{
+						"$gt": objectIDFromHex(t, "6384e03f24f854c1a8ce5380"),
+					},
+				},
+			},
+		},
+		{
+			name: "Create an empty filter when there is one bucket element",
+			buckets: []bucket{
+				{
+					ID: minMax{
+						Min: objectIDFromHex(t, "6384e03f24f854c1a8ce5378"),
+						Max: objectIDFromHex(t, "6384e03f24f854c1a8ce5380"),
+					},
+				},
+			},
+			want: []bson.M{{}},
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			if got := idFiltersFromBuckets(tt.buckets); !cmp.Equal(got, tt.want) {
+				t.Errorf("idFiltersFromBuckets() = %v, want %v", got, tt.want)
+			}
+		})
+	}
+}
+
+func Test_getChunkSize(t *testing.T) {
+	tests := []struct {
+		name       string
+		bundleSize int64
+		want       int64
+	}{
+		{
+			name:       "Return 1 MB if bundle size is less than 1 MB",
+			bundleSize: 1024,
+			want:       1024 * 1024,
+		},
+		{
+			name:       "Return 1 GB if bundle size is greater than 1 GB",
+			bundleSize: 2 * 1024 * 1024 * 1024,
+			want:       1024 * 1024 * 1024,
+		},
+		{
+			name:       "Return bundle size if bundle size is between 1 MB and 1 GB",
+			bundleSize: 4 * 1024 * 1024,
+			want:       4 * 1024 * 1024,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			if got := getChunkSize(tt.bundleSize); got != tt.want {
+				t.Errorf("getChunkSize() = %v, want %v", got, tt.want)
+			}
+		})
+	}
+}
+
+func Test_idFiltersFromSplits(t *testing.T) {
+	tests := []struct {
+		name      string
+		splitKeys []splitKey
+		want      []bson.M
+	}{
+		{
+			name: "Create one $lte filter for start range, one $gt filter for end range, and filters with both " +
+				"$lte and $gt for ranges in between when there are two or more splitKey elements",
+			splitKeys: []splitKey{
+				{
+					ID: objectIDFromHex(t, "6384e03f24f854c1a8ce5380"),
+				},
+				{
+					ID: objectIDFromHex(t, "6384e03f24f854c1a8ce5382"),
+				},
+			},
+			want: []bson.M{
+				{
+					"_id": bson.M{
+						"$lte": objectIDFromHex(t, "6384e03f24f854c1a8ce5380"),
+					},
+				},
+				{
+					"_id": bson.M{
+						"$gt":  objectIDFromHex(t, "6384e03f24f854c1a8ce5380"),
+						"$lte": objectIDFromHex(t, "6384e03f24f854c1a8ce5382"),
+					},
+				},
+				{
+					"_id": bson.M{
+						"$gt": objectIDFromHex(t, "6384e03f24f854c1a8ce5382"),
+					},
+				},
+			},
+		},
+		{
+			name: "Create one $lte filter for start range and one $gt filter for end range when there is one " +
+				"splitKey element",
+			splitKeys: []splitKey{
+				{
+					ID: objectIDFromHex(t, "6384e03f24f854c1a8ce5380"),
+				},
+			},
+			want: []bson.M{
+				{
+					"_id": bson.M{
+						"$lte": objectIDFromHex(t, "6384e03f24f854c1a8ce5380"),
+					},
+				},
+				{
+					"_id": bson.M{
+						"$gt": objectIDFromHex(t, "6384e03f24f854c1a8ce5380"),
+					},
+				},
+			},
+		},
+		{
+			name:      "Create an empty filter when there are no splitKey elements",
+			splitKeys: nil,
+			want:      []bson.M{{}},
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			if got := idFiltersFromSplits(tt.splitKeys); !cmp.Equal(got, tt.want) {
+				t.Errorf("idFiltersFromSplits() = %v, want %v", got, tt.want)
+			}
+		})
+	}
+}
+
+func Test_inferProjection(t *testing.T) {
+	type doc struct {
+		Field1 string `bson:"field1"`
+		Field2 string `bson:"field2"`
+		Field3 string `bson:"-"`
+	}
+
+	tests := []struct {
+		name   string
+		t      reflect.Type
+		tagKey string
+		want   bson.D
+	}{
+		{
+			name:   "Infer projection from struct bson tags",
+			t:      reflect.TypeOf(doc{}),
+			tagKey: "bson",
+			want: bson.D{
+				{Key: "field1", Value: 1},
+				{Key: "field2", Value: 1},
+			},
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			if got := inferProjection(tt.t, tt.tagKey); !cmp.Equal(got, tt.want) {
+				t.Errorf("inferProjection() = %v, want %v", got, tt.want)
+			}
+		})
+	}
+}
+
+func Test_inferProjectionPanic(t *testing.T) {
+	type doc struct{}
+
+	t.Run("Panic when type has no fields to infer", func(t *testing.T) {
+		defer func() {
+			if r := recover(); r == nil {
+				t.Errorf("inferProjection() does not panic")
+			}
+		}()
+
+		inferProjection(reflect.TypeOf(doc{}), "bson")
+	})
+}
+
+func Test_mergeFilters(t *testing.T) {
+	tests := []struct {
+		name     string
+		idFilter bson.M
+		filter   bson.M
+		want     bson.M
+	}{
+		{
+			name: "Returned merged ID filter and custom filter in an $and filter",
+			idFilter: bson.M{
+				"_id": bson.M{
+					"$gte": 10,
+				},
+			},
+			filter: bson.M{
+				"key": bson.M{
+					"$ne": "value",
+				},
+			},
+			want: bson.M{
+				"$and": []bson.M{
+					{
+						"_id": bson.M{
+							"$gte": 10,
+						},
+					},
+					{
+						"key": bson.M{
+							"$ne": "value",
+						},
+					},
+				},
+			},
+		},
+		{
+			name: "Return only ID filter when custom filter is empty",
+			idFilter: bson.M{
+				"_id": bson.M{
+					"$gte": 10,
+				},
+			},
+			filter: bson.M{},
+			want: bson.M{
+				"_id": bson.M{
+					"$gte": 10,
+				},
+			},
+		},
+		{
+			name:     "Return only custom filter when ID filter is empty",
+			idFilter: bson.M{},
+			filter: bson.M{
+				"key": bson.M{
+					"$ne": "value",
+				},
+			},
+			want: bson.M{
+				"key": bson.M{
+					"$ne": "value",
+				},
+			},
+		},
+		{
+			name:     "Return empty filter when both ID filter and custom filter are empty",
+			idFilter: bson.M{},
+			filter:   bson.M{},
+			want:     bson.M{},
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			if got := mergeFilters(tt.idFilter, tt.filter); !cmp.Equal(got, tt.want) {
+				t.Errorf("mergeFilters() = %v, want %v", got, tt.want)
+			}
+		})
+	}
+}
diff --git a/sdks/go/pkg/beam/io/mongodbio/write.go b/sdks/go/pkg/beam/io/mongodbio/write.go
new file mode 100644
index 00000000000..2332e3ba981
--- /dev/null
+++ b/sdks/go/pkg/beam/io/mongodbio/write.go
@@ -0,0 +1,204 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package mongodbio
+
+import (
+	"context"
+	"fmt"
+	"reflect"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/util/structx"
+	"go.mongodb.org/mongo-driver/bson"
+	"go.mongodb.org/mongo-driver/bson/primitive"
+	"go.mongodb.org/mongo-driver/mongo"
+	"go.mongodb.org/mongo-driver/mongo/options"
+)
+
+const (
+	defaultWriteBatchSize = 1000
+	defaultWriteOrdered   = true
+)
+
+func init() {
+	register.Function1x2(createIDFn)
+	register.Emitter2[primitive.ObjectID, beam.Y]()
+
+	register.DoFn3x0[context.Context, beam.Y, func(beam.X, beam.Y)](
+		&extractIDFn{},
+	)
+	register.Emitter2[beam.X, beam.Y]()
+
+	register.DoFn4x1[context.Context, beam.X, beam.Y, func(beam.X), error](
+		&writeFn{},
+	)
+	register.Emitter1[primitive.ObjectID]()
+}
+
+// Write writes a PCollection<T> of a type T to MongoDB. T must be a struct with exported fields
+// that should have a "bson" tag. If the struct has a field with the bson tag "_id", the value of
+// that field will be used as the id of the document. Otherwise, a new id field of type
+// primitive.ObjectID will be generated for each document. Write returns a PCollection<K> of the
+// inserted id values with type K.
+//
+// The Write transform has the required parameters:
+//   - s: the scope of the pipeline
+//   - uri: the MongoDB connection string
+//   - database: the MongoDB database to write to
+//   - collection: the MongoDB collection to write to
+//   - col: the PCollection to write to MongoDB
+//
+// The Write transform takes a variadic number of WriteOptionFn which can set the WriteOption
+// fields:
+//   - BatchSize: the number of documents to write in a single batch. Defaults to 1000
+//   - Ordered: whether to execute the writes in order. Defaults to true
+func Write(
+	s beam.Scope,
+	uri string,
+	database string,
+	collection string,
+	col beam.PCollection,
+	opts ...WriteOptionFn,
+) beam.PCollection {
+	s = s.Scope("mongodbio.Write")
+
+	option := &WriteOption{
+		BatchSize: defaultWriteBatchSize,
+		Ordered:   defaultWriteOrdered,
+	}
+
+	for _, opt := range opts {
+		if err := opt(option); err != nil {
+			panic(fmt.Sprintf("mongodbio.Write: invalid option: %v", err))
+		}
+	}
+
+	t := col.Type().Type()
+	idIndex := structx.FieldIndexByTag(t, bsonTag, "_id")
+
+	var keyed beam.PCollection
+
+	if idIndex == -1 {
+		pre := beam.ParDo(s, createIDFn, col)
+		keyed = beam.Reshuffle(s, pre)
+	} else {
+		keyed = beam.ParDo(
+			s,
+			newExtractIDFn(idIndex),
+			col,
+			beam.TypeDefinition{Var: beam.XType, T: t.Field(idIndex).Type},
+		)
+	}
+
+	return beam.ParDo(
+		s,
+		newWriteFn(uri, database, collection, option),
+		keyed,
+	)
+}
+
+func createIDFn(elem beam.Y) (primitive.ObjectID, beam.Y) {
+	id := primitive.NewObjectID()
+	return id, elem
+}
+
+type extractIDFn struct {
+	IDIndex int
+}
+
+func newExtractIDFn(idIndex int) *extractIDFn {
+	return &extractIDFn{
+		IDIndex: idIndex,
+	}
+}
+
+func (fn *extractIDFn) ProcessElement(
+	_ context.Context,
+	elem beam.Y,
+	emit func(beam.X, beam.Y),
+) {
+	id := reflect.ValueOf(elem).Field(fn.IDIndex).Interface()
+	emit(id, elem)
+}
+
+type writeFn struct {
+	mongoDBFn
+	BatchSize int64
+	Ordered   bool
+	models    []mongo.WriteModel
+}
+
+func newWriteFn(
+	uri string,
+	database string,
+	collection string,
+	option *WriteOption,
+) *writeFn {
+	return &writeFn{
+		mongoDBFn: mongoDBFn{
+			URI:        uri,
+			Database:   database,
+			Collection: collection,
+		},
+		BatchSize: option.BatchSize,
+		Ordered:   option.Ordered,
+	}
+}
+
+func (fn *writeFn) ProcessElement(
+	ctx context.Context,
+	key beam.X,
+	value beam.Y,
+	emit func(beam.X),
+) error {
+	model := mongo.NewReplaceOneModel().
+		SetFilter(bson.M{"_id": key}).
+		SetUpsert(true).
+		SetReplacement(value)
+
+	fn.models = append(fn.models, model)
+
+	if len(fn.models) >= int(fn.BatchSize) {
+		if err := fn.flush(ctx); err != nil {
+			return err
+		}
+	}
+
+	emit(key)
+
+	return nil
+}
+
+func (fn *writeFn) FinishBundle(ctx context.Context, _ func(beam.X)) error {
+	if len(fn.models) > 0 {
+		return fn.flush(ctx)
+	}
+
+	return nil
+}
+
+func (fn *writeFn) flush(ctx context.Context) error {
+	opts := options.BulkWrite().SetOrdered(fn.Ordered)
+
+	if _, err := fn.collection.BulkWrite(ctx, fn.models, opts); err != nil {
+		return fmt.Errorf("error bulk writing to MongoDB: %w", err)
+	}
+
+	fn.models = nil
+
+	return nil
+}
diff --git a/sdks/go/pkg/beam/io/mongodbio/write_option.go b/sdks/go/pkg/beam/io/mongodbio/write_option.go
new file mode 100644
index 00000000000..8d54b6052b8
--- /dev/null
+++ b/sdks/go/pkg/beam/io/mongodbio/write_option.go
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package mongodbio
+
+import (
+	"errors"
+)
+
+// WriteOption represents options for writing to MongoDB.
+type WriteOption struct {
+	BatchSize int64
+	Ordered   bool
+}
+
+// WriteOptionFn is a function that configures a WriteOption.
+type WriteOptionFn func(option *WriteOption) error
+
+// WithWriteBatchSize configures the WriteOption to use the provided batch size when writing
+// documents.
+func WithWriteBatchSize(batchSize int64) WriteOptionFn {
+	return func(o *WriteOption) error {
+		if batchSize <= 0 {
+			return errors.New("batch size must be greater than 0")
+		}
+
+		o.BatchSize = batchSize
+		return nil
+	}
+}
+
+// WithWriteOrdered configures the WriteOption whether to apply an ordered bulk write.
+func WithWriteOrdered(ordered bool) WriteOptionFn {
+	return func(o *WriteOption) error {
+		o.Ordered = ordered
+		return nil
+	}
+}
diff --git a/sdks/go/pkg/beam/io/mongodbio/write_option_test.go b/sdks/go/pkg/beam/io/mongodbio/write_option_test.go
new file mode 100644
index 00000000000..1e4e66bfbc4
--- /dev/null
+++ b/sdks/go/pkg/beam/io/mongodbio/write_option_test.go
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package mongodbio
+
+import (
+	"testing"
+)
+
+func TestWithWriteBatchSize(t *testing.T) {
+	tests := []struct {
+		name      string
+		batchSize int64
+		want      int64
+		wantErr   bool
+	}{
+		{
+			name:      "Set batch size to 500",
+			batchSize: 500,
+			want:      500,
+			wantErr:   false,
+		},
+		{
+			name:      "Error - batch size must be greater than 0",
+			batchSize: 0,
+			wantErr:   true,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			var option WriteOption
+
+			if err := WithWriteBatchSize(tt.batchSize)(&option); (err != nil) != tt.wantErr {
+				t.Fatalf("WithWriteBatchSize() error = %v, wantErr %v", err, tt.wantErr)
+			}
+
+			if option.BatchSize != tt.want {
+				t.Errorf("option.BatchSize = %v, want %v", option.BatchSize, tt.want)
+			}
+		})
+	}
+}
+
+func TestWithWriteOrdered(t *testing.T) {
+	tests := []struct {
+		name    string
+		ordered bool
+		want    bool
+		wantErr bool
+	}{
+		{
+			name:    "Set ordered to true",
+			ordered: true,
+			want:    true,
+			wantErr: false,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			var option WriteOption
+
+			if err := WithWriteOrdered(tt.ordered)(&option); (err != nil) != tt.wantErr {
+				t.Fatalf("WithWriteOrdered() err = %v, wantErr %v", err, tt.wantErr)
+			}
+
+			if option.Ordered != tt.want {
+				t.Errorf("option.Ordered = %v, want %v", option.Ordered, tt.want)
+			}
+		})
+	}
+}
diff --git a/sdks/go/pkg/beam/io/mongodbio/write_test.go b/sdks/go/pkg/beam/io/mongodbio/write_test.go
new file mode 100644
index 00000000000..6608df8362b
--- /dev/null
+++ b/sdks/go/pkg/beam/io/mongodbio/write_test.go
@@ -0,0 +1,54 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package mongodbio
+
+import (
+	"testing"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"github.com/google/go-cmp/cmp"
+)
+
+func Test_createIDFn(t *testing.T) {
+	type doc struct {
+		Field1 int32 `bson:"field1"`
+	}
+
+	tests := []struct {
+		name string
+		elem beam.Y
+		want beam.Y
+	}{
+		{
+			name: "Create key-value pair of a new object ID and element",
+			elem: doc{Field1: 1},
+			want: doc{Field1: 1},
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			gotKey, gotValue := createIDFn(tt.elem)
+
+			if gotKey.IsZero() {
+				t.Error("createIDFn() gotKey is zero")
+			}
+
+			if !cmp.Equal(gotValue, tt.want) {
+				t.Errorf("createIDFn() gotValue = %v, want %v", gotValue, tt.want)
+			}
+		})
+	}
+}
diff --git a/sdks/go/pkg/beam/util/structx/struct.go b/sdks/go/pkg/beam/util/structx/struct.go
index 2659191d38d..aec8a63652d 100644
--- a/sdks/go/pkg/beam/util/structx/struct.go
+++ b/sdks/go/pkg/beam/util/structx/struct.go
@@ -56,3 +56,22 @@ func InferFieldNames(t reflect.Type, key string) []string {
 
 	return names
 }
+
+// FieldIndexByTag returns the index of the field with the given tag key and value. Returns -1 if
+// the field is not found. Panics if the type's kind is not a struct.
+func FieldIndexByTag(t reflect.Type, key string, value string) int {
+	if t.Kind() != reflect.Struct {
+		panic(fmt.Sprintf("structx: FieldIndexByTag of non-struct type %s", t))
+	}
+
+	for i := 0; i < t.NumField(); i++ {
+		values := t.Field(i).Tag.Get(key)
+		name := strings.Split(values, ",")[0]
+
+		if name == value {
+			return i
+		}
+	}
+
+	return -1
+}
diff --git a/sdks/go/pkg/beam/util/structx/struct_test.go b/sdks/go/pkg/beam/util/structx/struct_test.go
index 6aac5869604..ab6c7278f62 100644
--- a/sdks/go/pkg/beam/util/structx/struct_test.go
+++ b/sdks/go/pkg/beam/util/structx/struct_test.go
@@ -142,3 +142,63 @@ func TestInferFieldNamesPanic(t *testing.T) {
 		InferFieldNames(reflect.TypeOf(""), "key")
 	})
 }
+
+func TestFieldIndexByTag(t *testing.T) {
+	tests := []struct {
+		name  string
+		t     reflect.Type
+		key   string
+		value string
+		want  int
+	}{
+		{
+			name: "Return index of field with matching tag key and value",
+			t: reflect.TypeOf(struct {
+				Field1 string `key:"field1"`
+				Field2 string `key:"field2"`
+			}{}),
+			key:   "key",
+			value: "field2",
+			want:  1,
+		},
+		{
+			name: "Return -1 for non-existent tag key",
+			t: reflect.TypeOf(struct {
+				Field1 string `key:"field1"`
+				Field2 string `key:"field2"`
+			}{}),
+			key:   "other",
+			value: "field1",
+			want:  -1,
+		},
+		{
+			name: "Return -1 for non-existent tag value",
+			t: reflect.TypeOf(struct {
+				Field1 string `key:"field1"`
+				Field2 string `key:"field2"`
+			}{}),
+			key:   "key",
+			value: "field3",
+			want:  -1,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			if got := FieldIndexByTag(tt.t, tt.key, tt.value); got != tt.want {
+				t.Errorf("FieldIndexByTag() = %v, want %v", got, tt.want)
+			}
+		})
+	}
+}
+
+func TestFieldIndexByTagPanic(t *testing.T) {
+	t.Run("Panic for non-struct type", func(t *testing.T) {
+		defer func() {
+			if r := recover(); r == nil {
+				t.Errorf("FieldIndexByTag() does not panic")
+			}
+		}()
+
+		FieldIndexByTag(reflect.TypeOf(""), "key", "field1")
+	})
+}
diff --git a/sdks/go/test/integration/integration.go b/sdks/go/test/integration/integration.go
index 2253df597bb..c13d8b16692 100644
--- a/sdks/go/test/integration/integration.go
+++ b/sdks/go/test/integration/integration.go
@@ -223,6 +223,7 @@ var dataflowFilters = []string{
 	"TestJDBCIO_BasicReadWrite",
 	"TestJDBCIO_PostgresReadWrite",
 	"TestDebeziumIO_BasicRead",
+	"TestMongoDBIO.*",
 	// TODO(BEAM-11576): TestFlattenDup failing on this runner.
 	"TestFlattenDup",
 	// The Dataflow runner does not support the TestStream primitive
diff --git a/sdks/go/test/integration/internal/containers/containers.go b/sdks/go/test/integration/internal/containers/containers.go
new file mode 100644
index 00000000000..5987897fd41
--- /dev/null
+++ b/sdks/go/test/integration/internal/containers/containers.go
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package containers contains utilities for running test containers in integration tests.
+package containers
+
+import (
+	"context"
+	"testing"
+
+	"github.com/docker/go-connections/nat"
+	"github.com/testcontainers/testcontainers-go"
+)
+
+type ContainerOptionFn func(*testcontainers.ContainerRequest)
+
+func WithPorts(ports []string) ContainerOptionFn {
+	return func(option *testcontainers.ContainerRequest) {
+		option.ExposedPorts = ports
+	}
+}
+
+func NewContainer(
+	ctx context.Context,
+	t *testing.T,
+	image string,
+	opts ...ContainerOptionFn,
+) testcontainers.Container {
+	t.Helper()
+
+	request := testcontainers.ContainerRequest{Image: image}
+
+	for _, opt := range opts {
+		opt(&request)
+	}
+
+	genericRequest := testcontainers.GenericContainerRequest{
+		ContainerRequest: request,
+		Started:          true,
+	}
+
+	container, err := testcontainers.GenericContainer(ctx, genericRequest)
+	if err != nil {
+		t.Fatalf("error creating container: %v", err)
+	}
+
+	t.Cleanup(func() {
+		if err := container.Terminate(ctx); err != nil {
+			t.Fatalf("error terminating container: %v", err)
+		}
+	})
+
+	return container
+}
+
+func Port(
+	ctx context.Context,
+	t *testing.T,
+	container testcontainers.Container,
+	port nat.Port,
+) string {
+	t.Helper()
+
+	mappedPort, err := container.MappedPort(ctx, port)
+	if err != nil {
+		t.Fatalf("error getting mapped port: %v", err)
+	}
+
+	return mappedPort.Port()
+}
diff --git a/sdks/go/test/integration/io/mongodbio/helper_test.go b/sdks/go/test/integration/io/mongodbio/helper_test.go
new file mode 100644
index 00000000000..751f9e56787
--- /dev/null
+++ b/sdks/go/test/integration/io/mongodbio/helper_test.go
@@ -0,0 +1,121 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package mongodbio
+
+import (
+	"context"
+	"testing"
+
+	"github.com/apache/beam/sdks/v2/go/test/integration/internal/containers"
+	"go.mongodb.org/mongo-driver/bson"
+	"go.mongodb.org/mongo-driver/bson/primitive"
+	"go.mongodb.org/mongo-driver/mongo"
+	"go.mongodb.org/mongo-driver/mongo/options"
+	"go.mongodb.org/mongo-driver/mongo/readpref"
+)
+
+const (
+	mongoImage = "mongo:6.0.3"
+	mongoPort  = "27017"
+)
+
+func setUpTestContainer(ctx context.Context, t *testing.T) string {
+	t.Helper()
+
+	container := containers.NewContainer(
+		ctx,
+		t,
+		mongoImage,
+		containers.WithPorts([]string{mongoPort + "/tcp"}),
+	)
+
+	return containers.Port(ctx, t, container, mongoPort)
+}
+
+func objectIDFromHex(t *testing.T, hex string) primitive.ObjectID {
+	t.Helper()
+
+	id, err := primitive.ObjectIDFromHex(hex)
+	if err != nil {
+		t.Fatalf("error parsing hex string to primitive.ObjectID: %v", err)
+	}
+
+	return id
+}
+
+func newClient(ctx context.Context, t *testing.T, uri string) *mongo.Client {
+	t.Helper()
+
+	opts := options.Client().ApplyURI(uri)
+
+	client, err := mongo.Connect(ctx, opts)
+	if err != nil {
+		t.Fatalf("error connecting to MongoDB: %v", err)
+	}
+
+	t.Cleanup(func() {
+		if err := client.Disconnect(ctx); err != nil {
+			t.Fatalf("error disconnecting from MongoDB: %v", err)
+		}
+	})
+
+	if err := client.Ping(ctx, readpref.Primary()); err != nil {
+		t.Fatalf("error pinging MongoDB: %v", err)
+	}
+
+	return client
+}
+
+func dropCollection(ctx context.Context, t *testing.T, collection *mongo.Collection) {
+	t.Helper()
+
+	if err := collection.Drop(ctx); err != nil {
+		t.Fatalf("error dropping collection: %v", err)
+	}
+}
+
+func readDocuments(
+	ctx context.Context,
+	t *testing.T,
+	collection *mongo.Collection,
+) []bson.M {
+	t.Helper()
+
+	cursor, err := collection.Find(ctx, bson.M{})
+	if err != nil {
+		t.Fatalf("error finding documents: %v", err)
+	}
+
+	var documents []bson.M
+	if err = cursor.All(ctx, &documents); err != nil {
+		t.Fatalf("error decoding documents: %v", err)
+	}
+
+	return documents
+}
+
+func writeDocuments(
+	ctx context.Context,
+	t *testing.T,
+	collection *mongo.Collection,
+	documents []any,
+) {
+	t.Helper()
+
+	if _, err := collection.InsertMany(ctx, documents); err != nil {
+		t.Fatalf("error inserting documents: %v", err)
+	}
+}
diff --git a/sdks/go/test/integration/io/mongodbio/mongodbio_test.go b/sdks/go/test/integration/io/mongodbio/mongodbio_test.go
new file mode 100644
index 00000000000..b8885e7c728
--- /dev/null
+++ b/sdks/go/test/integration/io/mongodbio/mongodbio_test.go
@@ -0,0 +1,237 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package mongodbio
+
+import (
+	"context"
+	"flag"
+	"fmt"
+	"reflect"
+	"testing"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/io/mongodbio"
+	_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/dataflow"
+	_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/flink"
+	_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/samza"
+	_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/spark"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
+	"github.com/apache/beam/sdks/v2/go/test/integration"
+	"github.com/google/go-cmp/cmp"
+	"go.mongodb.org/mongo-driver/bson"
+	"go.mongodb.org/mongo-driver/bson/primitive"
+)
+
+func init() {
+	beam.RegisterType(reflect.TypeOf((*docWithObjectID)(nil)).Elem())
+	beam.RegisterType(reflect.TypeOf((*docWithStringID)(nil)).Elem())
+}
+
+type docWithObjectID struct {
+	ID     primitive.ObjectID `bson:"_id"`
+	Field1 int32              `bson:"field1"`
+}
+
+type docWithStringID struct {
+	ID     string `bson:"_id"`
+	Field1 int32  `bson:"field1"`
+}
+
+func TestMongoDBIO_Read(t *testing.T) {
+	integration.CheckFilters(t)
+
+	ctx := context.Background()
+	port := setUpTestContainer(ctx, t)
+	uri := fmt.Sprintf("mongodb://%s:%s", "localhost", port)
+
+	tests := []struct {
+		name    string
+		input   []any
+		t       reflect.Type
+		options []mongodbio.ReadOptionFn
+		want    []any
+	}{
+		{
+			name: "Read documents from MongoDB with id of type primitive.ObjectID",
+			input: []any{
+				bson.M{"_id": objectIDFromHex(t, "61cf9980dd2d24dc5cf28620"), "field1": int32(0)},
+				bson.M{"_id": objectIDFromHex(t, "61cf9980dd2d24dc5cf28621"), "field1": int32(1)},
+				bson.M{"_id": objectIDFromHex(t, "61cf9980dd2d24dc5cf28622"), "field1": int32(2)},
+			},
+			t: reflect.TypeOf(docWithObjectID{}),
+			want: []any{
+				docWithObjectID{ID: objectIDFromHex(t, "61cf9980dd2d24dc5cf28620"), Field1: 0},
+				docWithObjectID{ID: objectIDFromHex(t, "61cf9980dd2d24dc5cf28621"), Field1: 1},
+				docWithObjectID{ID: objectIDFromHex(t, "61cf9980dd2d24dc5cf28622"), Field1: 2},
+			},
+		},
+		{
+			name: "Read documents from MongoDB with id of type string",
+			input: []any{
+				bson.M{"_id": "id01", "field1": int32(0)},
+				bson.M{"_id": "id02", "field1": int32(1)},
+				bson.M{"_id": "id03", "field1": int32(2)},
+			},
+			t: reflect.TypeOf(docWithStringID{}),
+			want: []any{
+				docWithStringID{ID: "id01", Field1: 0},
+				docWithStringID{ID: "id02", Field1: 1},
+				docWithStringID{ID: "id03", Field1: 2},
+			},
+		},
+		{
+			name: "Read documents from MongoDB where filter matches",
+			input: []any{
+				bson.M{"_id": objectIDFromHex(t, "61cf9980dd2d24dc5cf28620"), "field1": int32(0)},
+				bson.M{"_id": objectIDFromHex(t, "61cf9980dd2d24dc5cf28621"), "field1": int32(1)},
+				bson.M{"_id": objectIDFromHex(t, "61cf9980dd2d24dc5cf28622"), "field1": int32(2)},
+			},
+			t: reflect.TypeOf(docWithObjectID{}),
+			options: []mongodbio.ReadOptionFn{
+				mongodbio.WithReadFilter(bson.M{"field1": bson.M{"$gt": 0}}),
+			},
+			want: []any{
+				docWithObjectID{ID: objectIDFromHex(t, "61cf9980dd2d24dc5cf28621"), Field1: 1},
+				docWithObjectID{ID: objectIDFromHex(t, "61cf9980dd2d24dc5cf28622"), Field1: 2},
+			},
+		},
+		{
+			name: "Read documents from MongoDB with bucketAuto aggregation",
+			input: []any{
+				bson.M{"_id": objectIDFromHex(t, "61cf9980dd2d24dc5cf28620"), "field1": int32(0)},
+				bson.M{"_id": objectIDFromHex(t, "61cf9980dd2d24dc5cf28621"), "field1": int32(1)},
+				bson.M{"_id": objectIDFromHex(t, "61cf9980dd2d24dc5cf28622"), "field1": int32(2)},
+			},
+			t: reflect.TypeOf(docWithObjectID{}),
+			options: []mongodbio.ReadOptionFn{
+				mongodbio.WithReadBucketAuto(true),
+			},
+			want: []any{
+				docWithObjectID{ID: objectIDFromHex(t, "61cf9980dd2d24dc5cf28620"), Field1: 0},
+				docWithObjectID{ID: objectIDFromHex(t, "61cf9980dd2d24dc5cf28621"), Field1: 1},
+				docWithObjectID{ID: objectIDFromHex(t, "61cf9980dd2d24dc5cf28622"), Field1: 2},
+			},
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			database := "db"
+			collection := "coll"
+
+			client := newClient(ctx, t, uri)
+			mongoCollection := client.Database(database).Collection(collection)
+
+			t.Cleanup(func() {
+				dropCollection(ctx, t, mongoCollection)
+			})
+
+			writeDocuments(ctx, t, mongoCollection, tt.input)
+
+			p, s := beam.NewPipelineWithRoot()
+
+			got := mongodbio.Read(s, uri, database, collection, tt.t, tt.options...)
+
+			passert.Equals(s, got, tt.want...)
+			ptest.RunAndValidate(t, p)
+		})
+	}
+}
+
+func TestMongoDBIO_Write(t *testing.T) {
+	integration.CheckFilters(t)
+
+	ctx := context.Background()
+	port := setUpTestContainer(ctx, t)
+	uri := fmt.Sprintf("mongodb://%s:%s", "localhost", port)
+
+	tests := []struct {
+		name     string
+		input    []any
+		options  []mongodbio.WriteOptionFn
+		wantIDs  []any
+		wantDocs []bson.M
+	}{
+		{
+			name: "Write documents to MongoDB with id of type primitive.ObjectID",
+			input: []any{
+				docWithObjectID{ID: objectIDFromHex(t, "61cf9980dd2d24dc5cf28620"), Field1: 0},
+				docWithObjectID{ID: objectIDFromHex(t, "61cf9980dd2d24dc5cf28621"), Field1: 1},
+				docWithObjectID{ID: objectIDFromHex(t, "61cf9980dd2d24dc5cf28622"), Field1: 2},
+			},
+			wantIDs: []any{
+				objectIDFromHex(t, "61cf9980dd2d24dc5cf28620"),
+				objectIDFromHex(t, "61cf9980dd2d24dc5cf28621"),
+				objectIDFromHex(t, "61cf9980dd2d24dc5cf28622"),
+			},
+			wantDocs: []bson.M{
+				{"_id": objectIDFromHex(t, "61cf9980dd2d24dc5cf28620"), "field1": int32(0)},
+				{"_id": objectIDFromHex(t, "61cf9980dd2d24dc5cf28621"), "field1": int32(1)},
+				{"_id": objectIDFromHex(t, "61cf9980dd2d24dc5cf28622"), "field1": int32(2)},
+			},
+		},
+		{
+			name: "Write documents to MongoDB with id of type string",
+			input: []any{
+				docWithStringID{ID: "id01", Field1: 0},
+				docWithStringID{ID: "id02", Field1: 1},
+				docWithStringID{ID: "id03", Field1: 2},
+			},
+			wantIDs: []any{
+				"id01",
+				"id02",
+				"id03",
+			},
+			wantDocs: []bson.M{
+				{"_id": "id01", "field1": int32(0)},
+				{"_id": "id02", "field1": int32(1)},
+				{"_id": "id03", "field1": int32(2)},
+			},
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			database := "db"
+			collection := "coll"
+
+			client := newClient(ctx, t, uri)
+			mongoCollection := client.Database(database).Collection(collection)
+
+			t.Cleanup(func() {
+				dropCollection(ctx, t, mongoCollection)
+			})
+
+			p, s := beam.NewPipelineWithRoot()
+
+			col := beam.CreateList(s, tt.input)
+			gotIDs := mongodbio.Write(s, uri, database, collection, col, tt.options...)
+
+			passert.Equals(s, gotIDs, tt.wantIDs...)
+			ptest.RunAndValidate(t, p)
+
+			if gotDocs := readDocuments(ctx, t, mongoCollection); !cmp.Equal(gotDocs, tt.wantDocs) {
+				t.Errorf("readDocuments() = %v, want %v", gotDocs, tt.wantDocs)
+			}
+		})
+	}
+}
+
+func TestMain(m *testing.M) {
+	flag.Parse()
+	beam.Init()
+
+	ptest.MainRet(m)
+}