You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2022/12/28 17:55:59 UTC
[beam] branch master updated: [Go SDK]: MongoDB IO connector (#24663)
This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 1e763ad6d18 [Go SDK]: MongoDB IO connector (#24663)
1e763ad6d18 is described below
commit 1e763ad6d18f80de21d2db0d637cc7e8800d45ab
Author: Johanna Öjeling <51...@users.noreply.github.com>
AuthorDate: Wed Dec 28 18:55:47 2022 +0100
[Go SDK]: MongoDB IO connector (#24663)
---
CHANGES.md | 1 +
sdks/go.mod | 9 +-
sdks/go.sum | 23 +-
sdks/go/pkg/beam/io/mongodbio/coder.go | 68 +++
sdks/go/pkg/beam/io/mongodbio/coder_test.go | 160 +++++++
sdks/go/pkg/beam/io/mongodbio/common.go | 73 +++
sdks/go/pkg/beam/io/mongodbio/example_test.go | 180 ++++++++
sdks/go/pkg/beam/io/mongodbio/helper_test.go | 33 ++
sdks/go/pkg/beam/io/mongodbio/read.go | 492 +++++++++++++++++++++
sdks/go/pkg/beam/io/mongodbio/read_option.go | 60 +++
sdks/go/pkg/beam/io/mongodbio/read_option_test.go | 115 +++++
sdks/go/pkg/beam/io/mongodbio/read_test.go | 393 ++++++++++++++++
sdks/go/pkg/beam/io/mongodbio/write.go | 204 +++++++++
sdks/go/pkg/beam/io/mongodbio/write_option.go | 50 +++
sdks/go/pkg/beam/io/mongodbio/write_option_test.go | 83 ++++
sdks/go/pkg/beam/io/mongodbio/write_test.go | 54 +++
sdks/go/pkg/beam/util/structx/struct.go | 19 +
sdks/go/pkg/beam/util/structx/struct_test.go | 60 +++
sdks/go/test/integration/integration.go | 1 +
.../integration/internal/containers/containers.go | 82 ++++
.../test/integration/io/mongodbio/helper_test.go | 121 +++++
.../integration/io/mongodbio/mongodbio_test.go | 237 ++++++++++
22 files changed, 2516 insertions(+), 2 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 5bbfebec6b1..e856ef18dab 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -59,6 +59,7 @@
## I/Os
* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
+* MongoDB IO connector added (Go) ([#24575](https://github.com/apache/beam/issues/24575)).
## New Features / Improvements
diff --git a/sdks/go.mod b/sdks/go.mod
index c7ef57eedbe..e9d206ee49b 100644
--- a/sdks/go.mod
+++ b/sdks/go.mod
@@ -48,6 +48,7 @@ require (
github.com/testcontainers/testcontainers-go v0.15.0
github.com/xitongsys/parquet-go v1.6.2
github.com/xitongsys/parquet-go-source v0.0.0-20220315005136-aec0fe3e777c
+ go.mongodb.org/mongo-driver v1.11.1
golang.org/x/net v0.4.0
golang.org/x/oauth2 v0.0.0-20221014153046-6fdb5e3db783
golang.org/x/sync v0.1.0
@@ -113,11 +114,12 @@ require (
github.com/googleapis/gax-go/v2 v2.7.0 // indirect
github.com/inconshreveable/mousetrap v1.0.1 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
- github.com/klauspost/compress v1.13.1 // indirect
+ github.com/klauspost/compress v1.13.6 // indirect
github.com/magiconair/properties v1.8.6 // indirect
github.com/moby/sys/mount v0.3.3 // indirect
github.com/moby/sys/mountinfo v0.6.2 // indirect
github.com/moby/term v0.0.0-20210619224110-3f7ff695adc6 // indirect
+ github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect
github.com/morikuni/aec v1.0.0 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.0.3-0.20211202183452-c5a74bcca799 // indirect
@@ -128,7 +130,12 @@ require (
github.com/shabbyrobe/gocovmerge v0.0.0-20180507124511-f6ea450bfb63 // indirect
github.com/sirupsen/logrus v1.8.1 // indirect
github.com/spf13/pflag v1.0.5 // indirect
+ github.com/xdg-go/pbkdf2 v1.0.0 // indirect
+ github.com/xdg-go/scram v1.1.1 // indirect
+ github.com/xdg-go/stringprep v1.0.3 // indirect
+ github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect
go.opencensus.io v0.24.0 // indirect
+ golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d // indirect
golang.org/x/tools v0.1.12 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/appengine v1.6.7 // indirect
diff --git a/sdks/go.sum b/sdks/go.sum
index 3d7c21812c0..49045a6b937 100644
--- a/sdks/go.sum
+++ b/sdks/go.sum
@@ -469,6 +469,7 @@ github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaS
github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
+github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
@@ -580,8 +581,9 @@ github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+o
github.com/klauspost/compress v1.9.7/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/klauspost/compress v1.11.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/compress v1.11.13/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
-github.com/klauspost/compress v1.13.1 h1:wXr2uRxZTJXHLly6qhJabee5JqIhTRoLBhDOA74hDEQ=
github.com/klauspost/compress v1.13.1/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
+github.com/klauspost/compress v1.13.6 h1:P76CopJELS0TiO2mebmnzgWaajssP/EszplttgQxcgc=
+github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
@@ -639,6 +641,8 @@ github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJ
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
+github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe h1:iruDEfMl2E6fbMZ9s0scYfZQ84/6SPL6zC8ACM2oIL0=
+github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc=
github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A=
github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc=
github.com/mrunalp/fileutils v0.5.0/go.mod h1:M1WthSahJixYnrXQl/DFQuteStB1weuxD2QJNHXfbSQ=
@@ -816,6 +820,8 @@ github.com/testcontainers/testcontainers-go v0.15.0 h1:3Ex7PUGFv0b2bBsdOv6R42+SK
github.com/testcontainers/testcontainers-go v0.15.0/go.mod h1:PkohMRH2X8Hib0IWtifVexDfLPVT+tb5E9hsf7cW12w=
github.com/tetratelabs/wazero v1.0.0-pre.4 h1:RBJQT5OzmORkSp6MmZDWoFEr0zXjk4pmvMKAdeUnsaI=
github.com/tetratelabs/wazero v1.0.0-pre.4/go.mod h1:u8wrFmpdrykiFK0DFPiFm5a4+0RzsdmXYVtijBKqUVo=
+github.com/tidwall/pretty v1.0.0 h1:HsD+QiTn7sK6flMKIvNmpqz1qrpP3Ps6jOKIKMooyg4=
+github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc=
@@ -831,6 +837,12 @@ github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df/go.mod h1:JP3t17
github.com/vishvananda/netns v0.0.0-20200728191858-db3c7e526aae/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0=
github.com/willf/bitset v1.1.11-0.20200630133818-d5bec3311243/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4=
github.com/willf/bitset v1.1.11/go.mod h1:83CECat5yLh5zVOf4P1ErAgKA5UDvKtgyUABdr3+MjI=
+github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c=
+github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
+github.com/xdg-go/scram v1.1.1 h1:VOMT+81stJgXW3CpHyqHN3AXDYIMsx56mEFrB37Mb/E=
+github.com/xdg-go/scram v1.1.1/go.mod h1:RaEWvsqvNKKvBPvcKeFjrG2cJqOkHTiyTpzz23ni57g=
+github.com/xdg-go/stringprep v1.0.3 h1:kdwGpVNwPFtjs98xCGkHjQtGKh86rDcRZN17QEMCOIs=
+github.com/xdg-go/stringprep v1.0.3/go.mod h1:W3f5j4i+9rC0kuIEJL0ky1VpHXQU3ocBgklLGvcBnW8=
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU=
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ=
github.com/xeipuuv/gojsonschema v0.0.0-20180618132009-1d523034197f/go.mod h1:5yf86TLmAcydyeJq5YvxkGPE2fm/u4myDekKRoLuqhs=
@@ -843,6 +855,8 @@ github.com/xitongsys/parquet-go-source v0.0.0-20200817004010-026bad9b25d0/go.mod
github.com/xitongsys/parquet-go-source v0.0.0-20220315005136-aec0fe3e777c h1:UDtocVeACpnwauljUbeHD9UOjjcvF5kLUHruww7VT9A=
github.com/xitongsys/parquet-go-source v0.0.0-20220315005136-aec0fe3e777c/go.mod h1:qLb2Itmdcp7KPa5KZKvhE9U1q5bYSOmgeOckF/H2rQA=
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
+github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d h1:splanxYIlg+5LfHAM6xpdFEAYOk8iySO56hMFq6uLyA=
+github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
@@ -854,6 +868,8 @@ go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/bbolt v1.3.5/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ=
go.etcd.io/bbolt v1.3.6/go.mod h1:qXsaaIqmgQH0T+OPdb99Bf+PKfBBQVAdyD6TY9G8XM4=
go.etcd.io/etcd v0.5.0-alpha.5.0.20200910180754-dd1b699fc489/go.mod h1:yVHk9ub3CSBatqGNg7GRmsnfLWtoW60w4eDYfh7vHDg=
+go.mongodb.org/mongo-driver v1.11.1 h1:QP0znIRTuL0jf1oBQoAoM0C6ZJfBK4kx0Uumtv1A7w8=
+go.mongodb.org/mongo-driver v1.11.1/go.mod h1:s7p5vEtfbeR1gYi6pnj3c3/urpbLv2T5Sfd6Rp2HBB8=
go.mozilla.org/pkcs7 v0.0.0-20200128120323-432b2356ecb1/go.mod h1:SNgMg+EgDFwmvSmLRTNKC5fegJjB7v23qTQ0XLGUNHk=
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
@@ -881,6 +897,8 @@ golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de/go.mod h1:LzIPMQfyMNhhGPh
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8=
+golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d h1:sK3txAijHtOK88l68nt020reeT1ZdKLIYetKl95FzVY=
+golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
@@ -953,6 +971,7 @@ golang.org/x/net v0.0.0-20201224014010-6772e930b67b/go.mod h1:m0MpNAwzfU5UDzcl9v
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.0.0-20210825183410-e898025ed96a/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
+golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.4.0 h1:Q5QPcMlvfxFTAPV0+07Xz/MpK9NTXu2VDUuy0FeMfaU=
golang.org/x/net v0.4.0/go.mod h1:MBQ8lrhLObU/6UmLb4fmbmk5OcyYmqtbGd/9yIeKjEE=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
@@ -1044,6 +1063,7 @@ golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210426230700-d19ff857e887/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210906170528-6f6e22806c34/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
@@ -1060,6 +1080,7 @@ golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
+golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.5.0 h1:OLmvp0KP+FVG99Ct/qFiL/Fhk4zp4QQnZ7b2U+5piUM=
golang.org/x/text v0.5.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
diff --git a/sdks/go/pkg/beam/io/mongodbio/coder.go b/sdks/go/pkg/beam/io/mongodbio/coder.go
new file mode 100644
index 00000000000..c140f9a8a25
--- /dev/null
+++ b/sdks/go/pkg/beam/io/mongodbio/coder.go
@@ -0,0 +1,68 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package mongodbio
+
+import (
+ "fmt"
+ "reflect"
+
+ "github.com/apache/beam/sdks/v2/go/pkg/beam"
+ "go.mongodb.org/mongo-driver/bson"
+ "go.mongodb.org/mongo-driver/bson/primitive"
+)
+
+func init() {
+ beam.RegisterCoder(
+ reflect.TypeOf((*bson.M)(nil)).Elem(),
+ encodeBSONMap,
+ decodeBSONMap,
+ )
+ beam.RegisterCoder(
+ reflect.TypeOf((*primitive.ObjectID)(nil)).Elem(),
+ encodeObjectID,
+ decodeObjectID,
+ )
+}
+
+func encodeBSONMap(m bson.M) ([]byte, error) {
+ bytes, err := bson.Marshal(m)
+ if err != nil {
+ return nil, fmt.Errorf("error encoding BSON: %w", err)
+ }
+
+ return bytes, nil
+}
+
+func decodeBSONMap(bytes []byte) (bson.M, error) {
+ var out bson.M
+ if err := bson.Unmarshal(bytes, &out); err != nil {
+ return nil, fmt.Errorf("error decoding BSON: %w", err)
+ }
+
+ return out, nil
+}
+
+func encodeObjectID(objectID primitive.ObjectID) []byte {
+ return objectID[:]
+}
+
+func decodeObjectID(bytes []byte) primitive.ObjectID {
+ var out primitive.ObjectID
+
+ copy(out[:], bytes[:])
+
+ return out
+}
diff --git a/sdks/go/pkg/beam/io/mongodbio/coder_test.go b/sdks/go/pkg/beam/io/mongodbio/coder_test.go
new file mode 100644
index 00000000000..d5e3bb2974d
--- /dev/null
+++ b/sdks/go/pkg/beam/io/mongodbio/coder_test.go
@@ -0,0 +1,160 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package mongodbio
+
+import (
+ "testing"
+
+ "github.com/google/go-cmp/cmp"
+ "go.mongodb.org/mongo-driver/bson"
+ "go.mongodb.org/mongo-driver/bson/primitive"
+)
+
+func Test_encodeBSONMap(t *testing.T) {
+ tests := []struct {
+ name string
+ m bson.M
+ want []byte
+ wantErr bool
+ }{
+ {
+ name: "Encode bson.M",
+ m: bson.M{"key": "val"},
+ want: []byte{18, 0, 0, 0, 2, 107, 101, 121, 0, 4, 0, 0, 0, 118, 97, 108, 0, 0},
+ wantErr: false,
+ },
+ {
+ name: "Encode empty bson.M",
+ m: bson.M{},
+ want: []byte{5, 0, 0, 0, 0},
+ wantErr: false,
+ },
+ {
+ name: "Encode nil bson.M",
+ m: bson.M(nil),
+ want: []byte{5, 0, 0, 0, 0},
+ wantErr: false,
+ },
+ {
+ name: "Error - invalid bson.M",
+ m: bson.M{"key": make(chan int)},
+ wantErr: true,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ got, err := encodeBSONMap(tt.m)
+ if (err != nil) != tt.wantErr {
+ t.Fatalf("encodeBSONMap() error = %v, wantErr %v", err, tt.wantErr)
+ }
+
+ if !cmp.Equal(got, tt.want) {
+ t.Errorf("encodeBSONMap() got = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func Test_decodeBSONMap(t *testing.T) {
+ tests := []struct {
+ name string
+ bytes []byte
+ want bson.M
+ wantErr bool
+ }{
+ {
+ name: "Decode bson.M",
+ bytes: []byte{18, 0, 0, 0, 2, 107, 101, 121, 0, 4, 0, 0, 0, 118, 97, 108, 0, 0},
+ want: bson.M{"key": "val"},
+ wantErr: false,
+ },
+ {
+ name: "Decode empty bson.M",
+ bytes: []byte{5, 0, 0, 0, 0},
+ want: bson.M{},
+ wantErr: false,
+ },
+ {
+ name: "Error - invalid bson.M",
+ bytes: []byte{},
+ wantErr: true,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ got, err := decodeBSONMap(tt.bytes)
+ if (err != nil) != tt.wantErr {
+ t.Fatalf("decodeBSONMap() error = %v, wantErr %v", err, tt.wantErr)
+ }
+
+ if !cmp.Equal(got, tt.want) {
+ t.Errorf("decodeBSONMap() got = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func Test_encodeObjectID(t *testing.T) {
+ tests := []struct {
+ name string
+ objectID primitive.ObjectID
+ want []byte
+ }{
+ {
+ name: "Encode object ID",
+ objectID: objectIDFromHex(t, "5f1b2c3d4e5f60708090a0b0"),
+ want: []byte{95, 27, 44, 61, 78, 95, 96, 112, 128, 144, 160, 176},
+ },
+ {
+ name: "Encode nil object ID",
+ objectID: primitive.NilObjectID,
+ want: []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if got := encodeObjectID(tt.objectID); !cmp.Equal(got, tt.want) {
+ t.Errorf("encodeObjectID() = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func Test_decodeObjectID(t *testing.T) {
+ tests := []struct {
+ name string
+ bytes []byte
+ want primitive.ObjectID
+ }{
+ {
+ name: "Decode object ID",
+ bytes: []byte{95, 27, 44, 61, 78, 95, 96, 112, 128, 144, 160, 176},
+ want: objectIDFromHex(t, "5f1b2c3d4e5f60708090a0b0"),
+ },
+ {
+ name: "Decode nil object ID",
+ bytes: []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
+ want: primitive.NilObjectID,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if got := decodeObjectID(tt.bytes); !cmp.Equal(got, tt.want) {
+ t.Errorf("decodeObjectID() = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
diff --git a/sdks/go/pkg/beam/io/mongodbio/common.go b/sdks/go/pkg/beam/io/mongodbio/common.go
new file mode 100644
index 00000000000..9d6ffbeaa95
--- /dev/null
+++ b/sdks/go/pkg/beam/io/mongodbio/common.go
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package mongodbio contains transforms for reading from and writing to MongoDB.
+package mongodbio
+
+import (
+ "context"
+ "fmt"
+
+ "go.mongodb.org/mongo-driver/mongo"
+ "go.mongodb.org/mongo-driver/mongo/options"
+ "go.mongodb.org/mongo-driver/mongo/readpref"
+)
+
+const (
+ bsonTag = "bson"
+)
+
+type mongoDBFn struct {
+ URI string
+ Database string
+ Collection string
+ client *mongo.Client
+ collection *mongo.Collection
+}
+
+func (fn *mongoDBFn) Setup(ctx context.Context) error {
+ client, err := newClient(ctx, fn.URI)
+ if err != nil {
+ return err
+ }
+
+ fn.client = client
+ fn.collection = client.Database(fn.Database).Collection(fn.Collection)
+
+ return nil
+}
+
+func newClient(ctx context.Context, uri string) (*mongo.Client, error) {
+ opts := options.Client().ApplyURI(uri)
+
+ client, err := mongo.Connect(ctx, opts)
+ if err != nil {
+ return nil, fmt.Errorf("error connecting to MongoDB: %w", err)
+ }
+
+ if err := client.Ping(ctx, readpref.Primary()); err != nil {
+ return nil, fmt.Errorf("error pinging MongoDB: %w", err)
+ }
+
+ return client, nil
+}
+
+func (fn *mongoDBFn) Teardown(ctx context.Context) error {
+ if err := fn.client.Disconnect(ctx); err != nil {
+ return fmt.Errorf("error disconnecting from MongoDB: %w", err)
+ }
+
+ return nil
+}
diff --git a/sdks/go/pkg/beam/io/mongodbio/example_test.go b/sdks/go/pkg/beam/io/mongodbio/example_test.go
new file mode 100644
index 00000000000..3e77303843e
--- /dev/null
+++ b/sdks/go/pkg/beam/io/mongodbio/example_test.go
@@ -0,0 +1,180 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package mongodbio_test
+
+import (
+ "context"
+ "log"
+ "reflect"
+ "time"
+
+ "github.com/apache/beam/sdks/v2/go/pkg/beam"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/mongodbio"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug"
+ "go.mongodb.org/mongo-driver/bson"
+ "go.mongodb.org/mongo-driver/bson/primitive"
+)
+
+func ExampleRead_default() {
+ type Event struct {
+ ID primitive.ObjectID `bson:"_id"`
+ Timestamp int64 `bson:"timestamp"`
+ EventType int32 `bson:"event_type"`
+ }
+
+ beam.Init()
+ p, s := beam.NewPipelineWithRoot()
+
+ col := mongodbio.Read(
+ s,
+ "mongodb://localhost:27017",
+ "demo",
+ "events",
+ reflect.TypeOf(Event{}),
+ )
+ debug.Print(s, col)
+
+ if err := beamx.Run(context.Background(), p); err != nil {
+ log.Fatalf("Failed to execute job: %v", err)
+ }
+}
+
+func ExampleRead_options() {
+ type Event struct {
+ ID primitive.ObjectID `bson:"_id"`
+ Timestamp int64 `bson:"timestamp"`
+ EventType int32 `bson:"event_type"`
+ }
+
+ beam.Init()
+ p, s := beam.NewPipelineWithRoot()
+
+ col := mongodbio.Read(
+ s,
+ "mongodb://localhost:27017",
+ "demo",
+ "events",
+ reflect.TypeOf(Event{}),
+ mongodbio.WithReadBucketAuto(true),
+ mongodbio.WithReadBundleSize(32*1024*1024),
+ mongodbio.WithReadFilter(bson.M{"timestamp": bson.M{"$gt": 1640995200000}}),
+ )
+ debug.Print(s, col)
+
+ if err := beamx.Run(context.Background(), p); err != nil {
+ log.Fatalf("Failed to execute job: %v", err)
+ }
+}
+
+func ExampleWrite_default() {
+ type Event struct {
+ ID primitive.ObjectID `bson:"_id"`
+ Timestamp int64 `bson:"timestamp"`
+ EventType int32 `bson:"event_type"`
+ }
+
+ beam.Init()
+ p, s := beam.NewPipelineWithRoot()
+
+ input := []Event{
+ {
+ ID: primitive.NewObjectIDFromTimestamp(time.UnixMilli(1640995200001)),
+ Timestamp: 1640995200001,
+ EventType: 1,
+ },
+ {
+ ID: primitive.NewObjectIDFromTimestamp(time.UnixMilli(1640995200002)),
+ Timestamp: 1640995200002,
+ EventType: 2,
+ },
+ }
+
+ col := beam.CreateList(s, input)
+ mongodbio.Write(s, "mongodb://localhost:27017", "demo", "events", col)
+
+ if err := beamx.Run(context.Background(), p); err != nil {
+ log.Fatalf("Failed to execute job: %v", err)
+ }
+}
+
+func ExampleWrite_options() {
+ type Event struct {
+ ID primitive.ObjectID `bson:"_id"`
+ Timestamp int64 `bson:"timestamp"`
+ EventType int32 `bson:"event_type"`
+ }
+
+ beam.Init()
+ p, s := beam.NewPipelineWithRoot()
+
+ input := []Event{
+ {
+ ID: primitive.NewObjectIDFromTimestamp(time.UnixMilli(1640995200001)),
+ Timestamp: 1640995200001,
+ EventType: 1,
+ },
+ {
+ ID: primitive.NewObjectIDFromTimestamp(time.UnixMilli(1640995200002)),
+ Timestamp: 1640995200002,
+ EventType: 2,
+ },
+ }
+
+ col := beam.CreateList(s, input)
+ mongodbio.Write(
+ s,
+ "mongodb://localhost:27017",
+ "demo",
+ "events",
+ col,
+ mongodbio.WithWriteBatchSize(500),
+ mongodbio.WithWriteOrdered(false),
+ )
+
+ if err := beamx.Run(context.Background(), p); err != nil {
+ log.Fatalf("Failed to execute job: %v", err)
+ }
+}
+
+func ExampleWrite_generateID() {
+ type Event struct {
+ Timestamp int64 `bson:"timestamp"`
+ EventType int32 `bson:"event_type"`
+ }
+
+ beam.Init()
+ p, s := beam.NewPipelineWithRoot()
+
+ input := []Event{
+ {
+ Timestamp: 1640995200001,
+ EventType: 1,
+ },
+ {
+ Timestamp: 1640995200002,
+ EventType: 1,
+ },
+ }
+
+ col := beam.CreateList(s, input)
+ ids := mongodbio.Write(s, "mongodb://localhost:27017", "demo", "events", col)
+ debug.Print(s, ids)
+
+ if err := beamx.Run(context.Background(), p); err != nil {
+ log.Fatalf("Failed to execute job: %v", err)
+ }
+}
diff --git a/sdks/go/pkg/beam/io/mongodbio/helper_test.go b/sdks/go/pkg/beam/io/mongodbio/helper_test.go
new file mode 100644
index 00000000000..c5a63c15adb
--- /dev/null
+++ b/sdks/go/pkg/beam/io/mongodbio/helper_test.go
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package mongodbio
+
+import (
+ "testing"
+
+ "go.mongodb.org/mongo-driver/bson/primitive"
+)
+
+func objectIDFromHex(t *testing.T, hex string) primitive.ObjectID {
+ t.Helper()
+
+ id, err := primitive.ObjectIDFromHex(hex)
+ if err != nil {
+ t.Fatalf("error parsing hex string to primitive.ObjectID: %v", err)
+ }
+
+ return id
+}
diff --git a/sdks/go/pkg/beam/io/mongodbio/read.go b/sdks/go/pkg/beam/io/mongodbio/read.go
new file mode 100644
index 00000000000..e09f2f1e1af
--- /dev/null
+++ b/sdks/go/pkg/beam/io/mongodbio/read.go
@@ -0,0 +1,492 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package mongodbio
+
+import (
+ "context"
+ "fmt"
+ "math"
+ "reflect"
+
+ "github.com/apache/beam/sdks/v2/go/pkg/beam"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/util/structx"
+ "go.mongodb.org/mongo-driver/bson"
+ "go.mongodb.org/mongo-driver/mongo"
+ "go.mongodb.org/mongo-driver/mongo/options"
+ "go.mongodb.org/mongo-driver/mongo/readpref"
+)
+
+const (
+ defaultReadBundleSize = 64 * 1024 * 1024
+
+ minSplitVectorChunkSize = 1024 * 1024
+ maxSplitVectorChunkSize = 1024 * 1024 * 1024
+
+ maxBucketCount = math.MaxInt32
+)
+
+func init() {
+ register.DoFn3x1[context.Context, []byte, func(bson.M), error](&bucketAutoFn{})
+ register.DoFn3x1[context.Context, []byte, func(bson.M), error](&splitVectorFn{})
+ register.Emitter1[bson.M]()
+
+ register.DoFn3x1[context.Context, bson.M, func(beam.Y), error](&readFn{})
+ register.Emitter1[beam.Y]()
+}
+
+// Read reads a MongoDB collection and returns a PCollection<T> for a given type T. T must be a
+// struct with exported fields that should have a "bson" tag. By default, the transform uses the
+// MongoDB internal splitVector command to split the collection into bundles. The transform can be
+// configured to use the $bucketAuto aggregation instead to support reading from MongoDB Atlas
+// where the splitVector command is not allowed. This is enabled by passing the ReadOptionFn
+// WithReadBucketAuto(true).
+//
+// The Read transform has the required parameters:
+// - s: the scope of the pipeline
+// - uri: the MongoDB connection string
+// - database: the MongoDB database to read from
+// - collection: the MongoDB collection to read from
+// - t: the type of the elements in the collection
+//
+// The Read transform takes a variadic number of ReadOptionFn which can set the ReadOption fields:
+// - BucketAuto: whether to use the bucketAuto aggregation to split the collection into bundles.
+// Defaults to false
+// - Filter: a bson.M map that is used to filter the documents in the collection. Defaults to nil,
+// which means no filter is applied
+// - BundleSize: the size in bytes to bundle the documents into when reading. Defaults to
+// 64 * 1024 * 1024 (64 MB)
+func Read(
+ s beam.Scope,
+ uri string,
+ database string,
+ collection string,
+ t reflect.Type,
+ opts ...ReadOptionFn,
+) beam.PCollection {
+ s = s.Scope("mongodbio.Read")
+
+ option := &ReadOption{
+ BundleSize: defaultReadBundleSize,
+ }
+
+ for _, opt := range opts {
+ if err := opt(option); err != nil {
+ panic(fmt.Sprintf("mongodbio.Read: invalid option: %v", err))
+ }
+ }
+
+ imp := beam.Impulse(s)
+
+ var bundled beam.PCollection
+
+ if option.BucketAuto {
+ bundled = beam.ParDo(s, newBucketAutoFn(uri, database, collection, option), imp)
+ } else {
+ bundled = beam.ParDo(s, newSplitVectorFn(uri, database, collection, option), imp)
+ }
+
+ return beam.ParDo(
+ s,
+ newReadFn(uri, database, collection, t, option),
+ bundled,
+ beam.TypeDefinition{Var: beam.YType, T: t},
+ )
+}
+
+type bucketAutoFn struct {
+ mongoDBFn
+ BundleSize int64
+}
+
+func newBucketAutoFn(
+ uri string,
+ database string,
+ collection string,
+ option *ReadOption,
+) *bucketAutoFn {
+ return &bucketAutoFn{
+ mongoDBFn: mongoDBFn{
+ URI: uri,
+ Database: database,
+ Collection: collection,
+ },
+ BundleSize: option.BundleSize,
+ }
+}
+
+func (fn *bucketAutoFn) ProcessElement(
+ ctx context.Context,
+ _ []byte,
+ emit func(bson.M),
+) error {
+ collectionSize, err := fn.getCollectionSize(ctx)
+ if err != nil {
+ return err
+ }
+
+ if collectionSize == 0 {
+ return nil
+ }
+
+ bucketCount := calculateBucketCount(collectionSize, fn.BundleSize)
+
+ buckets, err := fn.getBuckets(ctx, bucketCount)
+ if err != nil {
+ return err
+ }
+
+ idFilters := idFiltersFromBuckets(buckets)
+
+ for _, filter := range idFilters {
+ emit(filter)
+ }
+
+ return nil
+}
+
+type collStats struct {
+ Size int64 `bson:"size"`
+}
+
+func (fn *bucketAutoFn) getCollectionSize(ctx context.Context) (int64, error) {
+ cmd := bson.M{"collStats": fn.Collection}
+ opts := options.RunCmd().SetReadPreference(readpref.Primary())
+
+ var stats collStats
+ if err := fn.collection.Database().RunCommand(ctx, cmd, opts).Decode(&stats); err != nil {
+ return 0, fmt.Errorf("error executing collStats command: %w", err)
+ }
+
+ return stats.Size, nil
+}
+
+func calculateBucketCount(collectionSize int64, bundleSize int64) int32 {
+ if bundleSize < 0 {
+ panic("monogdbio.calculateBucketCount: bundle size must be greater than 0")
+ }
+
+ count := collectionSize / bundleSize
+ if collectionSize%bundleSize != 0 {
+ count++
+ }
+
+ if count > int64(maxBucketCount) {
+ count = maxBucketCount
+ }
+
+ return int32(count)
+}
+
+type bucket struct {
+ ID minMax `bson:"_id"`
+}
+
+type minMax struct {
+ Min any `bson:"min"`
+ Max any `bson:"max"`
+}
+
+func (fn *bucketAutoFn) getBuckets(ctx context.Context, count int32) ([]bucket, error) {
+ pipeline := mongo.Pipeline{bson.D{{
+ Key: "$bucketAuto",
+ Value: bson.M{
+ "groupBy": "$_id",
+ "buckets": count,
+ },
+ }}}
+
+ cursor, err := fn.collection.Aggregate(ctx, pipeline)
+ if err != nil {
+ return nil, fmt.Errorf("error executing bucketAuto aggregation: %w", err)
+ }
+
+ var buckets []bucket
+ if err = cursor.All(ctx, &buckets); err != nil {
+ return nil, fmt.Errorf("error decoding buckets: %w", err)
+ }
+
+ return buckets, nil
+}
+
+func idFiltersFromBuckets(buckets []bucket) []bson.M {
+ idFilters := make([]bson.M, len(buckets))
+
+ for i := 0; i < len(buckets); i++ {
+ filter := bson.M{}
+
+ if i != 0 {
+ filter["$gt"] = buckets[i].ID.Min
+ }
+
+ if i != len(buckets)-1 {
+ filter["$lte"] = buckets[i].ID.Max
+ }
+
+ if len(filter) == 0 {
+ idFilters[i] = filter
+ } else {
+ idFilters[i] = bson.M{"_id": filter}
+ }
+ }
+
+ return idFilters
+}
+
+type splitVectorFn struct {
+ mongoDBFn
+ BundleSize int64
+}
+
+func newSplitVectorFn(
+ uri string,
+ database string,
+ collection string,
+ option *ReadOption,
+) *splitVectorFn {
+ return &splitVectorFn{
+ mongoDBFn: mongoDBFn{
+ URI: uri,
+ Database: database,
+ Collection: collection,
+ },
+ BundleSize: option.BundleSize,
+ }
+}
+
+func (fn *splitVectorFn) ProcessElement(
+ ctx context.Context,
+ _ []byte,
+ emit func(bson.M),
+) error {
+ chunkSize := getChunkSize(fn.BundleSize)
+
+ splitKeys, err := fn.getSplitKeys(ctx, chunkSize)
+ if err != nil {
+ return err
+ }
+
+ idFilters := idFiltersFromSplits(splitKeys)
+
+ for _, filter := range idFilters {
+ emit(filter)
+ }
+
+ return nil
+}
+
+func getChunkSize(bundleSize int64) int64 {
+ var chunkSize int64
+
+ if bundleSize < minSplitVectorChunkSize {
+ chunkSize = minSplitVectorChunkSize
+ } else if bundleSize > maxSplitVectorChunkSize {
+ chunkSize = maxSplitVectorChunkSize
+ } else {
+ chunkSize = bundleSize
+ }
+
+ return chunkSize
+}
+
+type splitVector struct {
+ SplitKeys []splitKey `bson:"splitKeys"`
+}
+
+type splitKey struct {
+ ID any `bson:"_id"`
+}
+
+func (fn *splitVectorFn) getSplitKeys(ctx context.Context, chunkSize int64) ([]splitKey, error) {
+ cmd := bson.D{
+ {Key: "splitVector", Value: fmt.Sprintf("%s.%s", fn.Database, fn.Collection)},
+ {Key: "keyPattern", Value: bson.D{{Key: "_id", Value: 1}}},
+ {Key: "maxChunkSizeBytes", Value: chunkSize},
+ }
+
+ opts := options.RunCmd().SetReadPreference(readpref.Primary())
+
+ var vector splitVector
+ if err := fn.collection.Database().RunCommand(ctx, cmd, opts).Decode(&vector); err != nil {
+ return nil, fmt.Errorf("error executing splitVector command: %w", err)
+ }
+
+ return vector.SplitKeys, nil
+}
+
+func idFiltersFromSplits(splitKeys []splitKey) []bson.M {
+ idFilters := make([]bson.M, len(splitKeys)+1)
+
+ for i := 0; i < len(splitKeys)+1; i++ {
+ filter := bson.M{}
+
+ if i > 0 {
+ filter["$gt"] = splitKeys[i-1].ID
+ }
+
+ if i < len(splitKeys) {
+ filter["$lte"] = splitKeys[i].ID
+ }
+
+ if len(filter) == 0 {
+ idFilters[i] = filter
+ } else {
+ idFilters[i] = bson.M{"_id": filter}
+ }
+ }
+
+ return idFilters
+}
+
+type readFn struct {
+ mongoDBFn
+ Filter []byte
+ Type beam.EncodedType
+ projection bson.D
+ filter bson.M
+}
+
+func newReadFn(
+ uri string,
+ database string,
+ collection string,
+ t reflect.Type,
+ option *ReadOption,
+) *readFn {
+ filter, err := encodeBSONMap(option.Filter)
+ if err != nil {
+ panic(fmt.Sprintf("mongodbio.newReadFn: %v", err))
+ }
+
+ return &readFn{
+ mongoDBFn: mongoDBFn{
+ URI: uri,
+ Database: database,
+ Collection: collection,
+ },
+ Filter: filter,
+ Type: beam.EncodedType{T: t},
+ }
+}
+
+func (fn *readFn) Setup(ctx context.Context) error {
+ if err := fn.mongoDBFn.Setup(ctx); err != nil {
+ return err
+ }
+
+ filter, err := decodeBSONMap(fn.Filter)
+ if err != nil {
+ return err
+ }
+
+ fn.filter = filter
+ fn.projection = inferProjection(fn.Type.T, bsonTag)
+
+ return nil
+}
+
+func inferProjection(t reflect.Type, tagKey string) bson.D {
+ names := structx.InferFieldNames(t, tagKey)
+ if len(names) == 0 {
+ panic("mongodbio.inferProjection: no names to infer projection from")
+ }
+
+ projection := make(bson.D, len(names))
+
+ for i, name := range names {
+ projection[i] = bson.E{Key: name, Value: 1}
+ }
+
+ return projection
+}
+
+func (fn *readFn) ProcessElement(
+ ctx context.Context,
+ elem bson.M,
+ emit func(beam.Y),
+) (err error) {
+ mergedFilter := mergeFilters(elem, fn.filter)
+
+ cursor, err := fn.findDocuments(ctx, fn.projection, mergedFilter)
+ if err != nil {
+ return err
+ }
+
+ defer func() {
+ closeErr := cursor.Close(ctx)
+
+ if err != nil {
+ if closeErr != nil {
+ log.Errorf(ctx, "error closing cursor: %v", closeErr)
+ }
+ return
+ }
+
+ err = closeErr
+ }()
+
+ for cursor.Next(ctx) {
+ value, err := decodeDocument(cursor, fn.Type.T)
+ if err != nil {
+ return err
+ }
+
+ emit(value)
+ }
+
+ return cursor.Err()
+}
+
+func mergeFilters(idFilter bson.M, customFilter bson.M) bson.M {
+ if len(idFilter) == 0 {
+ return customFilter
+ }
+
+ if len(customFilter) == 0 {
+ return idFilter
+ }
+
+ return bson.M{
+ "$and": []bson.M{idFilter, customFilter},
+ }
+}
+
+func (fn *readFn) findDocuments(
+ ctx context.Context,
+ projection bson.D,
+ filter bson.M,
+) (*mongo.Cursor, error) {
+ opts := options.Find().SetProjection(projection).SetAllowDiskUse(true)
+
+ cursor, err := fn.collection.Find(ctx, filter, opts)
+ if err != nil {
+ return nil, fmt.Errorf("error finding documents: %w", err)
+ }
+
+ return cursor, nil
+}
+
+func decodeDocument(cursor *mongo.Cursor, t reflect.Type) (any, error) {
+ out := reflect.New(t).Interface()
+ if err := cursor.Decode(out); err != nil {
+ return nil, fmt.Errorf("error decoding document: %w", err)
+ }
+
+ value := reflect.ValueOf(out).Elem().Interface()
+
+ return value, nil
+}
diff --git a/sdks/go/pkg/beam/io/mongodbio/read_option.go b/sdks/go/pkg/beam/io/mongodbio/read_option.go
new file mode 100644
index 00000000000..b724c306a81
--- /dev/null
+++ b/sdks/go/pkg/beam/io/mongodbio/read_option.go
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package mongodbio
+
+import (
+ "errors"
+
+ "go.mongodb.org/mongo-driver/bson"
+)
+
+// ReadOption represents options for reading from MongoDB.
+type ReadOption struct {
+ BucketAuto bool
+ Filter bson.M
+ BundleSize int64
+}
+
+// ReadOptionFn is a function that configures a ReadOption.
+type ReadOptionFn func(option *ReadOption) error
+
+// WithReadBucketAuto configures the ReadOption whether to use the bucketAuto aggregation stage.
+func WithReadBucketAuto(bucketAuto bool) ReadOptionFn {
+ return func(o *ReadOption) error {
+ o.BucketAuto = bucketAuto
+ return nil
+ }
+}
+
+// WithReadFilter configures the ReadOption to use the provided filter.
+func WithReadFilter(filter bson.M) ReadOptionFn {
+ return func(o *ReadOption) error {
+ o.Filter = filter
+ return nil
+ }
+}
+
+// WithReadBundleSize configures the ReadOption to use the provided bundle size in bytes.
+func WithReadBundleSize(bundleSize int64) ReadOptionFn {
+ return func(o *ReadOption) error {
+ if bundleSize <= 0 {
+ return errors.New("bundle size must be greater than 0")
+ }
+
+ o.BundleSize = bundleSize
+ return nil
+ }
+}
diff --git a/sdks/go/pkg/beam/io/mongodbio/read_option_test.go b/sdks/go/pkg/beam/io/mongodbio/read_option_test.go
new file mode 100644
index 00000000000..d4fe4dfa63a
--- /dev/null
+++ b/sdks/go/pkg/beam/io/mongodbio/read_option_test.go
@@ -0,0 +1,115 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package mongodbio
+
+import (
+ "testing"
+
+ "github.com/google/go-cmp/cmp"
+ "go.mongodb.org/mongo-driver/bson"
+)
+
+func TestWithReadBucketAuto(t *testing.T) {
+ tests := []struct {
+ name string
+ bucketAuto bool
+ want bool
+ wantErr bool
+ }{
+ {
+ name: "Set bucket auto to true",
+ bucketAuto: true,
+ want: true,
+ wantErr: false,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ var option ReadOption
+
+ if err := WithReadBucketAuto(tt.bucketAuto)(&option); (err != nil) != tt.wantErr {
+ t.Fatalf("WithReadBucketAuto() error = %v, wantErr %v", err, tt.wantErr)
+ }
+
+ if option.BucketAuto != tt.want {
+ t.Errorf("option.BucketAuto = %v, want %v", option.BucketAuto, tt.want)
+ }
+ })
+ }
+}
+
+func TestWithReadFilter(t *testing.T) {
+ tests := []struct {
+ name string
+ filter bson.M
+ want bson.M
+ wantErr bool
+ }{
+ {
+ name: "Set filter to {\"key\": \"value\"}",
+ filter: bson.M{"key": "value"},
+ want: bson.M{"key": "value"},
+ wantErr: false,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ var option ReadOption
+
+ if err := WithReadFilter(tt.filter)(&option); (err != nil) != tt.wantErr {
+ t.Fatalf("WithReadFilter() error = %v, wantErr %v", err, tt.wantErr)
+ }
+
+ if !cmp.Equal(option.Filter, tt.want) {
+ t.Errorf("option.Filter = %v, want %v", option.Filter, tt.want)
+ }
+ })
+ }
+}
+
+func TestWithReadBundleSize(t *testing.T) {
+ tests := []struct {
+ name string
+ bundleSize int64
+ want int64
+ wantErr bool
+ }{
+ {
+ name: "Set bundle size to 1024",
+ bundleSize: 1024,
+ want: 1024,
+ wantErr: false,
+ },
+ {
+ name: "Error - bundle size must be greater than 0",
+ bundleSize: 0,
+ wantErr: true,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ var option ReadOption
+
+ if err := WithReadBundleSize(tt.bundleSize)(&option); (err != nil) != tt.wantErr {
+ t.Fatalf("WithReadBundleSize() error = %v, wantErr %v", err, tt.wantErr)
+ }
+
+ if option.BundleSize != tt.want {
+ t.Errorf("option.BundleSize = %v, want %v", option.BundleSize, tt.want)
+ }
+ })
+ }
+}
diff --git a/sdks/go/pkg/beam/io/mongodbio/read_test.go b/sdks/go/pkg/beam/io/mongodbio/read_test.go
new file mode 100644
index 00000000000..5899457d5a8
--- /dev/null
+++ b/sdks/go/pkg/beam/io/mongodbio/read_test.go
@@ -0,0 +1,393 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package mongodbio
+
+import (
+ "math"
+ "reflect"
+ "testing"
+
+ "github.com/google/go-cmp/cmp"
+ "go.mongodb.org/mongo-driver/bson"
+)
+
+func Test_calculateBucketCount(t *testing.T) {
+ tests := []struct {
+ name string
+ collectionSize int64
+ bundleSize int64
+ want int32
+ }{
+ {
+ name: "Return ceiling of collection size / bundle size",
+ collectionSize: 3 * 1024 * 1024,
+ bundleSize: 2 * 1024 * 1024,
+ want: 2,
+ },
+ {
+ name: "Return max int32 when calculated count is greater than max int32",
+ collectionSize: 1024 * 1024 * 1024 * 1024,
+ bundleSize: 1,
+ want: math.MaxInt32,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if got := calculateBucketCount(tt.collectionSize, tt.bundleSize); got != tt.want {
+ t.Errorf("calculateBucketCount() = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func Test_calculateBucketCountPanic(t *testing.T) {
+ t.Run("Panic when bundleSize is not greater than 0", func(t *testing.T) {
+ defer func() {
+ if r := recover(); r == nil {
+ t.Errorf("calculateBucketCount() does not panic")
+ }
+ }()
+
+ calculateBucketCount(1024, 0)
+ })
+}
+
+func Test_idFiltersFromBuckets(t *testing.T) {
+ tests := []struct {
+ name string
+ buckets []bucket
+ want []bson.M
+ }{
+ {
+ name: "Create one $lte filter for start range, one $gt filter for end range, and filters with both " +
+ "$lte and $gt for ranges in between when there are three or more bucket elements",
+ buckets: []bucket{
+ {
+ ID: minMax{
+ Min: objectIDFromHex(t, "6384e03f24f854c1a8ce5378"),
+ Max: objectIDFromHex(t, "6384e03f24f854c1a8ce5380"),
+ },
+ },
+ {
+ ID: minMax{
+ Min: objectIDFromHex(t, "6384e03f24f854c1a8ce5380"),
+ Max: objectIDFromHex(t, "6384e03f24f854c1a8ce5382"),
+ },
+ },
+ {
+ ID: minMax{
+ Min: objectIDFromHex(t, "6384e03f24f854c1a8ce5382"),
+ Max: objectIDFromHex(t, "6384e03f24f854c1a8ce5384"),
+ },
+ },
+ },
+ want: []bson.M{
+ {
+ "_id": bson.M{
+ "$lte": objectIDFromHex(t, "6384e03f24f854c1a8ce5380"),
+ },
+ },
+ {
+ "_id": bson.M{
+ "$gt": objectIDFromHex(t, "6384e03f24f854c1a8ce5380"),
+ "$lte": objectIDFromHex(t, "6384e03f24f854c1a8ce5382"),
+ },
+ },
+ {
+ "_id": bson.M{
+ "$gt": objectIDFromHex(t, "6384e03f24f854c1a8ce5382"),
+ },
+ },
+ },
+ },
+ {
+ name: "Create one $lte filter for start range and one $gt filter for end range when there are two " +
+ "bucket elements",
+ buckets: []bucket{
+ {
+ ID: minMax{
+ Min: objectIDFromHex(t, "6384e03f24f854c1a8ce5378"),
+ Max: objectIDFromHex(t, "6384e03f24f854c1a8ce5380"),
+ },
+ },
+ {
+ ID: minMax{
+ Min: objectIDFromHex(t, "6384e03f24f854c1a8ce5380"),
+ Max: objectIDFromHex(t, "6384e03f24f854c1a8ce5382"),
+ },
+ },
+ },
+ want: []bson.M{
+ {
+ "_id": bson.M{
+ "$lte": objectIDFromHex(t, "6384e03f24f854c1a8ce5380"),
+ },
+ },
+ {
+ "_id": bson.M{
+ "$gt": objectIDFromHex(t, "6384e03f24f854c1a8ce5380"),
+ },
+ },
+ },
+ },
+ {
+ name: "Create an empty filter when there is one bucket element",
+ buckets: []bucket{
+ {
+ ID: minMax{
+ Min: objectIDFromHex(t, "6384e03f24f854c1a8ce5378"),
+ Max: objectIDFromHex(t, "6384e03f24f854c1a8ce5380"),
+ },
+ },
+ },
+ want: []bson.M{{}},
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if got := idFiltersFromBuckets(tt.buckets); !cmp.Equal(got, tt.want) {
+ t.Errorf("idFiltersFromBuckets() = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func Test_getChunkSize(t *testing.T) {
+ tests := []struct {
+ name string
+ bundleSize int64
+ want int64
+ }{
+ {
+ name: "Return 1 MB if bundle size is less than 1 MB",
+ bundleSize: 1024,
+ want: 1024 * 1024,
+ },
+ {
+ name: "Return 1 GB if bundle size is greater than 1 GB",
+ bundleSize: 2 * 1024 * 1024 * 1024,
+ want: 1024 * 1024 * 1024,
+ },
+ {
+ name: "Return bundle size if bundle size is between 1 MB and 1 GB",
+ bundleSize: 4 * 1024 * 1024,
+ want: 4 * 1024 * 1024,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if got := getChunkSize(tt.bundleSize); got != tt.want {
+ t.Errorf("getChunkSize() = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func Test_idFiltersFromSplits(t *testing.T) {
+ tests := []struct {
+ name string
+ splitKeys []splitKey
+ want []bson.M
+ }{
+ {
+ name: "Create one $lte filter for start range, one $gt filter for end range, and filters with both " +
+ "$lte and $gt for ranges in between when there are two or more splitKey elements",
+ splitKeys: []splitKey{
+ {
+ ID: objectIDFromHex(t, "6384e03f24f854c1a8ce5380"),
+ },
+ {
+ ID: objectIDFromHex(t, "6384e03f24f854c1a8ce5382"),
+ },
+ },
+ want: []bson.M{
+ {
+ "_id": bson.M{
+ "$lte": objectIDFromHex(t, "6384e03f24f854c1a8ce5380"),
+ },
+ },
+ {
+ "_id": bson.M{
+ "$gt": objectIDFromHex(t, "6384e03f24f854c1a8ce5380"),
+ "$lte": objectIDFromHex(t, "6384e03f24f854c1a8ce5382"),
+ },
+ },
+ {
+ "_id": bson.M{
+ "$gt": objectIDFromHex(t, "6384e03f24f854c1a8ce5382"),
+ },
+ },
+ },
+ },
+ {
+ name: "Create one $lte filter for start range and one $gt filter for end range when there is one " +
+ "splitKey element",
+ splitKeys: []splitKey{
+ {
+ ID: objectIDFromHex(t, "6384e03f24f854c1a8ce5380"),
+ },
+ },
+ want: []bson.M{
+ {
+ "_id": bson.M{
+ "$lte": objectIDFromHex(t, "6384e03f24f854c1a8ce5380"),
+ },
+ },
+ {
+ "_id": bson.M{
+ "$gt": objectIDFromHex(t, "6384e03f24f854c1a8ce5380"),
+ },
+ },
+ },
+ },
+ {
+ name: "Create an empty filter when there are no splitKey elements",
+ splitKeys: nil,
+ want: []bson.M{{}},
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if got := idFiltersFromSplits(tt.splitKeys); !cmp.Equal(got, tt.want) {
+ t.Errorf("idFiltersFromSplits() = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func Test_inferProjection(t *testing.T) {
+ type doc struct {
+ Field1 string `bson:"field1"`
+ Field2 string `bson:"field2"`
+ Field3 string `bson:"-"`
+ }
+
+ tests := []struct {
+ name string
+ t reflect.Type
+ tagKey string
+ want bson.D
+ }{
+ {
+ name: "Infer projection from struct bson tags",
+ t: reflect.TypeOf(doc{}),
+ tagKey: "bson",
+ want: bson.D{
+ {Key: "field1", Value: 1},
+ {Key: "field2", Value: 1},
+ },
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if got := inferProjection(tt.t, tt.tagKey); !cmp.Equal(got, tt.want) {
+ t.Errorf("inferProjection() = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func Test_inferProjectionPanic(t *testing.T) {
+ type doc struct{}
+
+ t.Run("Panic when type has no fields to infer", func(t *testing.T) {
+ defer func() {
+ if r := recover(); r == nil {
+ t.Errorf("inferProjection() does not panic")
+ }
+ }()
+
+ inferProjection(reflect.TypeOf(doc{}), "bson")
+ })
+}
+
+func Test_mergeFilters(t *testing.T) {
+ tests := []struct {
+ name string
+ idFilter bson.M
+ filter bson.M
+ want bson.M
+ }{
+ {
+ name: "Returned merged ID filter and custom filter in an $and filter",
+ idFilter: bson.M{
+ "_id": bson.M{
+ "$gte": 10,
+ },
+ },
+ filter: bson.M{
+ "key": bson.M{
+ "$ne": "value",
+ },
+ },
+ want: bson.M{
+ "$and": []bson.M{
+ {
+ "_id": bson.M{
+ "$gte": 10,
+ },
+ },
+ {
+ "key": bson.M{
+ "$ne": "value",
+ },
+ },
+ },
+ },
+ },
+ {
+ name: "Return only ID filter when custom filter is empty",
+ idFilter: bson.M{
+ "_id": bson.M{
+ "$gte": 10,
+ },
+ },
+ filter: bson.M{},
+ want: bson.M{
+ "_id": bson.M{
+ "$gte": 10,
+ },
+ },
+ },
+ {
+ name: "Return only custom filter when ID filter is empty",
+ idFilter: bson.M{},
+ filter: bson.M{
+ "key": bson.M{
+ "$ne": "value",
+ },
+ },
+ want: bson.M{
+ "key": bson.M{
+ "$ne": "value",
+ },
+ },
+ },
+ {
+ name: "Return empty filter when both ID filter and custom filter are empty",
+ idFilter: bson.M{},
+ filter: bson.M{},
+ want: bson.M{},
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if got := mergeFilters(tt.idFilter, tt.filter); !cmp.Equal(got, tt.want) {
+ t.Errorf("mergeFilters() = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
diff --git a/sdks/go/pkg/beam/io/mongodbio/write.go b/sdks/go/pkg/beam/io/mongodbio/write.go
new file mode 100644
index 00000000000..2332e3ba981
--- /dev/null
+++ b/sdks/go/pkg/beam/io/mongodbio/write.go
@@ -0,0 +1,204 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package mongodbio
+
+import (
+ "context"
+ "fmt"
+ "reflect"
+
+ "github.com/apache/beam/sdks/v2/go/pkg/beam"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/util/structx"
+ "go.mongodb.org/mongo-driver/bson"
+ "go.mongodb.org/mongo-driver/bson/primitive"
+ "go.mongodb.org/mongo-driver/mongo"
+ "go.mongodb.org/mongo-driver/mongo/options"
+)
+
+const (
+ defaultWriteBatchSize = 1000
+ defaultWriteOrdered = true
+)
+
+func init() {
+ register.Function1x2(createIDFn)
+ register.Emitter2[primitive.ObjectID, beam.Y]()
+
+ register.DoFn3x0[context.Context, beam.Y, func(beam.X, beam.Y)](
+ &extractIDFn{},
+ )
+ register.Emitter2[beam.X, beam.Y]()
+
+ register.DoFn4x1[context.Context, beam.X, beam.Y, func(beam.X), error](
+ &writeFn{},
+ )
+ register.Emitter1[primitive.ObjectID]()
+}
+
+// Write writes a PCollection<T> of a type T to MongoDB. T must be a struct with exported fields
+// that should have a "bson" tag. If the struct has a field with the bson tag "_id", the value of
+// that field will be used as the id of the document. Otherwise, a new id field of type
+// primitive.ObjectID will be generated for each document. Write returns a PCollection<K> of the
+// inserted id values with type K.
+//
+// The Write transform has the required parameters:
+// - s: the scope of the pipeline
+// - uri: the MongoDB connection string
+// - database: the MongoDB database to write to
+// - collection: the MongoDB collection to write to
+// - col: the PCollection to write to MongoDB
+//
+// The Write transform takes a variadic number of WriteOptionFn which can set the WriteOption
+// fields:
+// - BatchSize: the number of documents to write in a single batch. Defaults to 1000
+// - Ordered: whether to execute the writes in order. Defaults to true
+func Write(
+ s beam.Scope,
+ uri string,
+ database string,
+ collection string,
+ col beam.PCollection,
+ opts ...WriteOptionFn,
+) beam.PCollection {
+ s = s.Scope("mongodbio.Write")
+
+ option := &WriteOption{
+ BatchSize: defaultWriteBatchSize,
+ Ordered: defaultWriteOrdered,
+ }
+
+ for _, opt := range opts {
+ if err := opt(option); err != nil {
+ panic(fmt.Sprintf("mongodbio.Write: invalid option: %v", err))
+ }
+ }
+
+ t := col.Type().Type()
+ idIndex := structx.FieldIndexByTag(t, bsonTag, "_id")
+
+ var keyed beam.PCollection
+
+ if idIndex == -1 {
+ pre := beam.ParDo(s, createIDFn, col)
+ keyed = beam.Reshuffle(s, pre)
+ } else {
+ keyed = beam.ParDo(
+ s,
+ newExtractIDFn(idIndex),
+ col,
+ beam.TypeDefinition{Var: beam.XType, T: t.Field(idIndex).Type},
+ )
+ }
+
+ return beam.ParDo(
+ s,
+ newWriteFn(uri, database, collection, option),
+ keyed,
+ )
+}
+
+func createIDFn(elem beam.Y) (primitive.ObjectID, beam.Y) {
+ id := primitive.NewObjectID()
+ return id, elem
+}
+
+type extractIDFn struct {
+ IDIndex int
+}
+
+func newExtractIDFn(idIndex int) *extractIDFn {
+ return &extractIDFn{
+ IDIndex: idIndex,
+ }
+}
+
+func (fn *extractIDFn) ProcessElement(
+ _ context.Context,
+ elem beam.Y,
+ emit func(beam.X, beam.Y),
+) {
+ id := reflect.ValueOf(elem).Field(fn.IDIndex).Interface()
+ emit(id, elem)
+}
+
+type writeFn struct {
+ mongoDBFn
+ BatchSize int64
+ Ordered bool
+ models []mongo.WriteModel
+}
+
+func newWriteFn(
+ uri string,
+ database string,
+ collection string,
+ option *WriteOption,
+) *writeFn {
+ return &writeFn{
+ mongoDBFn: mongoDBFn{
+ URI: uri,
+ Database: database,
+ Collection: collection,
+ },
+ BatchSize: option.BatchSize,
+ Ordered: option.Ordered,
+ }
+}
+
+func (fn *writeFn) ProcessElement(
+ ctx context.Context,
+ key beam.X,
+ value beam.Y,
+ emit func(beam.X),
+) error {
+ model := mongo.NewReplaceOneModel().
+ SetFilter(bson.M{"_id": key}).
+ SetUpsert(true).
+ SetReplacement(value)
+
+ fn.models = append(fn.models, model)
+
+ if len(fn.models) >= int(fn.BatchSize) {
+ if err := fn.flush(ctx); err != nil {
+ return err
+ }
+ }
+
+ emit(key)
+
+ return nil
+}
+
+func (fn *writeFn) FinishBundle(ctx context.Context, _ func(beam.X)) error {
+ if len(fn.models) > 0 {
+ return fn.flush(ctx)
+ }
+
+ return nil
+}
+
+func (fn *writeFn) flush(ctx context.Context) error {
+ opts := options.BulkWrite().SetOrdered(fn.Ordered)
+
+ if _, err := fn.collection.BulkWrite(ctx, fn.models, opts); err != nil {
+ return fmt.Errorf("error bulk writing to MongoDB: %w", err)
+ }
+
+ fn.models = nil
+
+ return nil
+}
diff --git a/sdks/go/pkg/beam/io/mongodbio/write_option.go b/sdks/go/pkg/beam/io/mongodbio/write_option.go
new file mode 100644
index 00000000000..8d54b6052b8
--- /dev/null
+++ b/sdks/go/pkg/beam/io/mongodbio/write_option.go
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package mongodbio
+
+import (
+ "errors"
+)
+
+// WriteOption represents options for writing to MongoDB.
+type WriteOption struct {
+ BatchSize int64
+ Ordered bool
+}
+
+// WriteOptionFn is a function that configures a WriteOption.
+type WriteOptionFn func(option *WriteOption) error
+
+// WithWriteBatchSize configures the WriteOption to use the provided batch size when writing
+// documents.
+func WithWriteBatchSize(batchSize int64) WriteOptionFn {
+ return func(o *WriteOption) error {
+ if batchSize <= 0 {
+ return errors.New("batch size must be greater than 0")
+ }
+
+ o.BatchSize = batchSize
+ return nil
+ }
+}
+
+// WithWriteOrdered configures the WriteOption whether to apply an ordered bulk write.
+func WithWriteOrdered(ordered bool) WriteOptionFn {
+ return func(o *WriteOption) error {
+ o.Ordered = ordered
+ return nil
+ }
+}
diff --git a/sdks/go/pkg/beam/io/mongodbio/write_option_test.go b/sdks/go/pkg/beam/io/mongodbio/write_option_test.go
new file mode 100644
index 00000000000..1e4e66bfbc4
--- /dev/null
+++ b/sdks/go/pkg/beam/io/mongodbio/write_option_test.go
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package mongodbio
+
+import (
+ "testing"
+)
+
+func TestWithWriteBatchSize(t *testing.T) {
+ tests := []struct {
+ name string
+ batchSize int64
+ want int64
+ wantErr bool
+ }{
+ {
+ name: "Set batch size to 500",
+ batchSize: 500,
+ want: 500,
+ wantErr: false,
+ },
+ {
+ name: "Error - batch size must be greater than 0",
+ batchSize: 0,
+ wantErr: true,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ var option WriteOption
+
+ if err := WithWriteBatchSize(tt.batchSize)(&option); (err != nil) != tt.wantErr {
+ t.Fatalf("WithWriteBatchSize() error = %v, wantErr %v", err, tt.wantErr)
+ }
+
+ if option.BatchSize != tt.want {
+ t.Errorf("option.BatchSize = %v, want %v", option.BatchSize, tt.want)
+ }
+ })
+ }
+}
+
+func TestWithWriteOrdered(t *testing.T) {
+ tests := []struct {
+ name string
+ ordered bool
+ want bool
+ wantErr bool
+ }{
+ {
+ name: "Set ordered to true",
+ ordered: true,
+ want: true,
+ wantErr: false,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ var option WriteOption
+
+ if err := WithWriteOrdered(tt.ordered)(&option); (err != nil) != tt.wantErr {
+ t.Fatalf("WithWriteOrdered() err = %v, wantErr %v", err, tt.wantErr)
+ }
+
+ if option.Ordered != tt.want {
+ t.Errorf("option.Ordered = %v, want %v", option.Ordered, tt.want)
+ }
+ })
+ }
+}
diff --git a/sdks/go/pkg/beam/io/mongodbio/write_test.go b/sdks/go/pkg/beam/io/mongodbio/write_test.go
new file mode 100644
index 00000000000..6608df8362b
--- /dev/null
+++ b/sdks/go/pkg/beam/io/mongodbio/write_test.go
@@ -0,0 +1,54 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package mongodbio
+
+import (
+ "testing"
+
+ "github.com/apache/beam/sdks/v2/go/pkg/beam"
+ "github.com/google/go-cmp/cmp"
+)
+
+func Test_createIDFn(t *testing.T) {
+ type doc struct {
+ Field1 int32 `bson:"field1"`
+ }
+
+ tests := []struct {
+ name string
+ elem beam.Y
+ want beam.Y
+ }{
+ {
+ name: "Create key-value pair of a new object ID and element",
+ elem: doc{Field1: 1},
+ want: doc{Field1: 1},
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ gotKey, gotValue := createIDFn(tt.elem)
+
+ if gotKey.IsZero() {
+ t.Error("createIDFn() gotKey is zero")
+ }
+
+ if !cmp.Equal(gotValue, tt.want) {
+ t.Errorf("createIDFn() gotValue = %v, want %v", gotValue, tt.want)
+ }
+ })
+ }
+}
diff --git a/sdks/go/pkg/beam/util/structx/struct.go b/sdks/go/pkg/beam/util/structx/struct.go
index 2659191d38d..aec8a63652d 100644
--- a/sdks/go/pkg/beam/util/structx/struct.go
+++ b/sdks/go/pkg/beam/util/structx/struct.go
@@ -56,3 +56,22 @@ func InferFieldNames(t reflect.Type, key string) []string {
return names
}
+
+// FieldIndexByTag returns the index of the field with the given tag key and value. Returns -1 if
+// the field is not found. Panics if the type's kind is not a struct.
+func FieldIndexByTag(t reflect.Type, key string, value string) int {
+ if t.Kind() != reflect.Struct {
+ panic(fmt.Sprintf("structx: FieldIndexByTag of non-struct type %s", t))
+ }
+
+ for i := 0; i < t.NumField(); i++ {
+ values := t.Field(i).Tag.Get(key)
+ name := strings.Split(values, ",")[0]
+
+ if name == value {
+ return i
+ }
+ }
+
+ return -1
+}
diff --git a/sdks/go/pkg/beam/util/structx/struct_test.go b/sdks/go/pkg/beam/util/structx/struct_test.go
index 6aac5869604..ab6c7278f62 100644
--- a/sdks/go/pkg/beam/util/structx/struct_test.go
+++ b/sdks/go/pkg/beam/util/structx/struct_test.go
@@ -142,3 +142,63 @@ func TestInferFieldNamesPanic(t *testing.T) {
InferFieldNames(reflect.TypeOf(""), "key")
})
}
+
+func TestFieldIndexByTag(t *testing.T) {
+ tests := []struct {
+ name string
+ t reflect.Type
+ key string
+ value string
+ want int
+ }{
+ {
+ name: "Return index of field with matching tag key and value",
+ t: reflect.TypeOf(struct {
+ Field1 string `key:"field1"`
+ Field2 string `key:"field2"`
+ }{}),
+ key: "key",
+ value: "field2",
+ want: 1,
+ },
+ {
+ name: "Return -1 for non-existent tag key",
+ t: reflect.TypeOf(struct {
+ Field1 string `key:"field1"`
+ Field2 string `key:"field2"`
+ }{}),
+ key: "other",
+ value: "field1",
+ want: -1,
+ },
+ {
+ name: "Return -1 for non-existent tag value",
+ t: reflect.TypeOf(struct {
+ Field1 string `key:"field1"`
+ Field2 string `key:"field2"`
+ }{}),
+ key: "key",
+ value: "field3",
+ want: -1,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if got := FieldIndexByTag(tt.t, tt.key, tt.value); got != tt.want {
+ t.Errorf("FieldIndexByTag() = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func TestFieldIndexByTagPanic(t *testing.T) {
+ t.Run("Panic for non-struct type", func(t *testing.T) {
+ defer func() {
+ if r := recover(); r == nil {
+ t.Errorf("FieldIndexByTag() does not panic")
+ }
+ }()
+
+ FieldIndexByTag(reflect.TypeOf(""), "key", "field1")
+ })
+}
diff --git a/sdks/go/test/integration/integration.go b/sdks/go/test/integration/integration.go
index 2253df597bb..c13d8b16692 100644
--- a/sdks/go/test/integration/integration.go
+++ b/sdks/go/test/integration/integration.go
@@ -223,6 +223,7 @@ var dataflowFilters = []string{
"TestJDBCIO_BasicReadWrite",
"TestJDBCIO_PostgresReadWrite",
"TestDebeziumIO_BasicRead",
+ "TestMongoDBIO.*",
// TODO(BEAM-11576): TestFlattenDup failing on this runner.
"TestFlattenDup",
// The Dataflow runner does not support the TestStream primitive
diff --git a/sdks/go/test/integration/internal/containers/containers.go b/sdks/go/test/integration/internal/containers/containers.go
new file mode 100644
index 00000000000..5987897fd41
--- /dev/null
+++ b/sdks/go/test/integration/internal/containers/containers.go
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package containers contains utilities for running test containers in integration tests.
+package containers
+
+import (
+ "context"
+ "testing"
+
+ "github.com/docker/go-connections/nat"
+ "github.com/testcontainers/testcontainers-go"
+)
+
+type ContainerOptionFn func(*testcontainers.ContainerRequest)
+
+func WithPorts(ports []string) ContainerOptionFn {
+ return func(option *testcontainers.ContainerRequest) {
+ option.ExposedPorts = ports
+ }
+}
+
+func NewContainer(
+ ctx context.Context,
+ t *testing.T,
+ image string,
+ opts ...ContainerOptionFn,
+) testcontainers.Container {
+ t.Helper()
+
+ request := testcontainers.ContainerRequest{Image: image}
+
+ for _, opt := range opts {
+ opt(&request)
+ }
+
+ genericRequest := testcontainers.GenericContainerRequest{
+ ContainerRequest: request,
+ Started: true,
+ }
+
+ container, err := testcontainers.GenericContainer(ctx, genericRequest)
+ if err != nil {
+ t.Fatalf("error creating container: %v", err)
+ }
+
+ t.Cleanup(func() {
+ if err := container.Terminate(ctx); err != nil {
+ t.Fatalf("error terminating container: %v", err)
+ }
+ })
+
+ return container
+}
+
+func Port(
+ ctx context.Context,
+ t *testing.T,
+ container testcontainers.Container,
+ port nat.Port,
+) string {
+ t.Helper()
+
+ mappedPort, err := container.MappedPort(ctx, port)
+ if err != nil {
+ t.Fatalf("error getting mapped port: %v", err)
+ }
+
+ return mappedPort.Port()
+}
diff --git a/sdks/go/test/integration/io/mongodbio/helper_test.go b/sdks/go/test/integration/io/mongodbio/helper_test.go
new file mode 100644
index 00000000000..751f9e56787
--- /dev/null
+++ b/sdks/go/test/integration/io/mongodbio/helper_test.go
@@ -0,0 +1,121 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package mongodbio
+
+import (
+ "context"
+ "testing"
+
+ "github.com/apache/beam/sdks/v2/go/test/integration/internal/containers"
+ "go.mongodb.org/mongo-driver/bson"
+ "go.mongodb.org/mongo-driver/bson/primitive"
+ "go.mongodb.org/mongo-driver/mongo"
+ "go.mongodb.org/mongo-driver/mongo/options"
+ "go.mongodb.org/mongo-driver/mongo/readpref"
+)
+
+const (
+ mongoImage = "mongo:6.0.3"
+ mongoPort = "27017"
+)
+
+func setUpTestContainer(ctx context.Context, t *testing.T) string {
+ t.Helper()
+
+ container := containers.NewContainer(
+ ctx,
+ t,
+ mongoImage,
+ containers.WithPorts([]string{mongoPort + "/tcp"}),
+ )
+
+ return containers.Port(ctx, t, container, mongoPort)
+}
+
+func objectIDFromHex(t *testing.T, hex string) primitive.ObjectID {
+ t.Helper()
+
+ id, err := primitive.ObjectIDFromHex(hex)
+ if err != nil {
+ t.Fatalf("error parsing hex string to primitive.ObjectID: %v", err)
+ }
+
+ return id
+}
+
+func newClient(ctx context.Context, t *testing.T, uri string) *mongo.Client {
+ t.Helper()
+
+ opts := options.Client().ApplyURI(uri)
+
+ client, err := mongo.Connect(ctx, opts)
+ if err != nil {
+ t.Fatalf("error connecting to MongoDB: %v", err)
+ }
+
+ t.Cleanup(func() {
+ if err := client.Disconnect(ctx); err != nil {
+ t.Fatalf("error disconnecting from MongoDB: %v", err)
+ }
+ })
+
+ if err := client.Ping(ctx, readpref.Primary()); err != nil {
+ t.Fatalf("error pinging MongoDB: %v", err)
+ }
+
+ return client
+}
+
+func dropCollection(ctx context.Context, t *testing.T, collection *mongo.Collection) {
+ t.Helper()
+
+ if err := collection.Drop(ctx); err != nil {
+ t.Fatalf("error dropping collection: %v", err)
+ }
+}
+
+func readDocuments(
+ ctx context.Context,
+ t *testing.T,
+ collection *mongo.Collection,
+) []bson.M {
+ t.Helper()
+
+ cursor, err := collection.Find(ctx, bson.M{})
+ if err != nil {
+ t.Fatalf("error finding documents: %v", err)
+ }
+
+ var documents []bson.M
+ if err = cursor.All(ctx, &documents); err != nil {
+ t.Fatalf("error decoding documents: %v", err)
+ }
+
+ return documents
+}
+
+func writeDocuments(
+ ctx context.Context,
+ t *testing.T,
+ collection *mongo.Collection,
+ documents []any,
+) {
+ t.Helper()
+
+ if _, err := collection.InsertMany(ctx, documents); err != nil {
+ t.Fatalf("error inserting documents: %v", err)
+ }
+}
diff --git a/sdks/go/test/integration/io/mongodbio/mongodbio_test.go b/sdks/go/test/integration/io/mongodbio/mongodbio_test.go
new file mode 100644
index 00000000000..b8885e7c728
--- /dev/null
+++ b/sdks/go/test/integration/io/mongodbio/mongodbio_test.go
@@ -0,0 +1,237 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package mongodbio
+
+import (
+ "context"
+ "flag"
+ "fmt"
+ "reflect"
+ "testing"
+
+ "github.com/apache/beam/sdks/v2/go/pkg/beam"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/mongodbio"
+ _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/dataflow"
+ _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/flink"
+ _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/samza"
+ _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/spark"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
+ "github.com/apache/beam/sdks/v2/go/test/integration"
+ "github.com/google/go-cmp/cmp"
+ "go.mongodb.org/mongo-driver/bson"
+ "go.mongodb.org/mongo-driver/bson/primitive"
+)
+
+func init() {
+ beam.RegisterType(reflect.TypeOf((*docWithObjectID)(nil)).Elem())
+ beam.RegisterType(reflect.TypeOf((*docWithStringID)(nil)).Elem())
+}
+
+type docWithObjectID struct {
+ ID primitive.ObjectID `bson:"_id"`
+ Field1 int32 `bson:"field1"`
+}
+
+type docWithStringID struct {
+ ID string `bson:"_id"`
+ Field1 int32 `bson:"field1"`
+}
+
+func TestMongoDBIO_Read(t *testing.T) {
+ integration.CheckFilters(t)
+
+ ctx := context.Background()
+ port := setUpTestContainer(ctx, t)
+ uri := fmt.Sprintf("mongodb://%s:%s", "localhost", port)
+
+ tests := []struct {
+ name string
+ input []any
+ t reflect.Type
+ options []mongodbio.ReadOptionFn
+ want []any
+ }{
+ {
+ name: "Read documents from MongoDB with id of type primitive.ObjectID",
+ input: []any{
+ bson.M{"_id": objectIDFromHex(t, "61cf9980dd2d24dc5cf28620"), "field1": int32(0)},
+ bson.M{"_id": objectIDFromHex(t, "61cf9980dd2d24dc5cf28621"), "field1": int32(1)},
+ bson.M{"_id": objectIDFromHex(t, "61cf9980dd2d24dc5cf28622"), "field1": int32(2)},
+ },
+ t: reflect.TypeOf(docWithObjectID{}),
+ want: []any{
+ docWithObjectID{ID: objectIDFromHex(t, "61cf9980dd2d24dc5cf28620"), Field1: 0},
+ docWithObjectID{ID: objectIDFromHex(t, "61cf9980dd2d24dc5cf28621"), Field1: 1},
+ docWithObjectID{ID: objectIDFromHex(t, "61cf9980dd2d24dc5cf28622"), Field1: 2},
+ },
+ },
+ {
+ name: "Read documents from MongoDB with id of type string",
+ input: []any{
+ bson.M{"_id": "id01", "field1": int32(0)},
+ bson.M{"_id": "id02", "field1": int32(1)},
+ bson.M{"_id": "id03", "field1": int32(2)},
+ },
+ t: reflect.TypeOf(docWithStringID{}),
+ want: []any{
+ docWithStringID{ID: "id01", Field1: 0},
+ docWithStringID{ID: "id02", Field1: 1},
+ docWithStringID{ID: "id03", Field1: 2},
+ },
+ },
+ {
+ name: "Read documents from MongoDB where filter matches",
+ input: []any{
+ bson.M{"_id": objectIDFromHex(t, "61cf9980dd2d24dc5cf28620"), "field1": int32(0)},
+ bson.M{"_id": objectIDFromHex(t, "61cf9980dd2d24dc5cf28621"), "field1": int32(1)},
+ bson.M{"_id": objectIDFromHex(t, "61cf9980dd2d24dc5cf28622"), "field1": int32(2)},
+ },
+ t: reflect.TypeOf(docWithObjectID{}),
+ options: []mongodbio.ReadOptionFn{
+ mongodbio.WithReadFilter(bson.M{"field1": bson.M{"$gt": 0}}),
+ },
+ want: []any{
+ docWithObjectID{ID: objectIDFromHex(t, "61cf9980dd2d24dc5cf28621"), Field1: 1},
+ docWithObjectID{ID: objectIDFromHex(t, "61cf9980dd2d24dc5cf28622"), Field1: 2},
+ },
+ },
+ {
+ name: "Read documents from MongoDB with bucketAuto aggregation",
+ input: []any{
+ bson.M{"_id": objectIDFromHex(t, "61cf9980dd2d24dc5cf28620"), "field1": int32(0)},
+ bson.M{"_id": objectIDFromHex(t, "61cf9980dd2d24dc5cf28621"), "field1": int32(1)},
+ bson.M{"_id": objectIDFromHex(t, "61cf9980dd2d24dc5cf28622"), "field1": int32(2)},
+ },
+ t: reflect.TypeOf(docWithObjectID{}),
+ options: []mongodbio.ReadOptionFn{
+ mongodbio.WithReadBucketAuto(true),
+ },
+ want: []any{
+ docWithObjectID{ID: objectIDFromHex(t, "61cf9980dd2d24dc5cf28620"), Field1: 0},
+ docWithObjectID{ID: objectIDFromHex(t, "61cf9980dd2d24dc5cf28621"), Field1: 1},
+ docWithObjectID{ID: objectIDFromHex(t, "61cf9980dd2d24dc5cf28622"), Field1: 2},
+ },
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ database := "db"
+ collection := "coll"
+
+ client := newClient(ctx, t, uri)
+ mongoCollection := client.Database(database).Collection(collection)
+
+ t.Cleanup(func() {
+ dropCollection(ctx, t, mongoCollection)
+ })
+
+ writeDocuments(ctx, t, mongoCollection, tt.input)
+
+ p, s := beam.NewPipelineWithRoot()
+
+ got := mongodbio.Read(s, uri, database, collection, tt.t, tt.options...)
+
+ passert.Equals(s, got, tt.want...)
+ ptest.RunAndValidate(t, p)
+ })
+ }
+}
+
+func TestMongoDBIO_Write(t *testing.T) {
+ integration.CheckFilters(t)
+
+ ctx := context.Background()
+ port := setUpTestContainer(ctx, t)
+ uri := fmt.Sprintf("mongodb://%s:%s", "localhost", port)
+
+ tests := []struct {
+ name string
+ input []any
+ options []mongodbio.WriteOptionFn
+ wantIDs []any
+ wantDocs []bson.M
+ }{
+ {
+ name: "Write documents to MongoDB with id of type primitive.ObjectID",
+ input: []any{
+ docWithObjectID{ID: objectIDFromHex(t, "61cf9980dd2d24dc5cf28620"), Field1: 0},
+ docWithObjectID{ID: objectIDFromHex(t, "61cf9980dd2d24dc5cf28621"), Field1: 1},
+ docWithObjectID{ID: objectIDFromHex(t, "61cf9980dd2d24dc5cf28622"), Field1: 2},
+ },
+ wantIDs: []any{
+ objectIDFromHex(t, "61cf9980dd2d24dc5cf28620"),
+ objectIDFromHex(t, "61cf9980dd2d24dc5cf28621"),
+ objectIDFromHex(t, "61cf9980dd2d24dc5cf28622"),
+ },
+ wantDocs: []bson.M{
+ {"_id": objectIDFromHex(t, "61cf9980dd2d24dc5cf28620"), "field1": int32(0)},
+ {"_id": objectIDFromHex(t, "61cf9980dd2d24dc5cf28621"), "field1": int32(1)},
+ {"_id": objectIDFromHex(t, "61cf9980dd2d24dc5cf28622"), "field1": int32(2)},
+ },
+ },
+ {
+ name: "Write documents to MongoDB with id of type string",
+ input: []any{
+ docWithStringID{ID: "id01", Field1: 0},
+ docWithStringID{ID: "id02", Field1: 1},
+ docWithStringID{ID: "id03", Field1: 2},
+ },
+ wantIDs: []any{
+ "id01",
+ "id02",
+ "id03",
+ },
+ wantDocs: []bson.M{
+ {"_id": "id01", "field1": int32(0)},
+ {"_id": "id02", "field1": int32(1)},
+ {"_id": "id03", "field1": int32(2)},
+ },
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ database := "db"
+ collection := "coll"
+
+ client := newClient(ctx, t, uri)
+ mongoCollection := client.Database(database).Collection(collection)
+
+ t.Cleanup(func() {
+ dropCollection(ctx, t, mongoCollection)
+ })
+
+ p, s := beam.NewPipelineWithRoot()
+
+ col := beam.CreateList(s, tt.input)
+ gotIDs := mongodbio.Write(s, uri, database, collection, col, tt.options...)
+
+ passert.Equals(s, gotIDs, tt.wantIDs...)
+ ptest.RunAndValidate(t, p)
+
+ if gotDocs := readDocuments(ctx, t, mongoCollection); !cmp.Equal(gotDocs, tt.wantDocs) {
+ t.Errorf("readDocuments() = %v, want %v", gotDocs, tt.wantDocs)
+ }
+ })
+ }
+}
+
+func TestMain(m *testing.M) {
+ flag.Parse()
+ beam.Init()
+
+ ptest.MainRet(m)
+}