You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/07/26 21:10:04 UTC

[GitHub] [beam] NagisaVon commented on a diff in pull request #22400: [MongoDBio] custom writeFn for mongodb io python sdk

NagisaVon commented on code in PR #22400:
URL: https://github.com/apache/beam/pull/22400#discussion_r930414401


##########
sdks/python/apache_beam/io/mongodbio.py:
##########
@@ -709,6 +714,36 @@ def __init__(
     self._coll = coll
     self._batch_size = batch_size
     self._spec = extra_client_params
+    self._writeFn = writeFn
+    if writeFn is None:
+        self._writeFn = self._defaultWriteFn
+
+
+  """to gain control over the write process, 
+  a user could for example, change ReplaceOne into UpdateOne
+  a user could implement their own WriteFn, using this as a template,
+  notice that the 'self' argument should be ommited 
+  """
+  def _defaultWriteFn(self, client, db, coll, documents, logger):
+      requests = []
+      for doc in documents:
+          request = ReplaceOne(
+              filter={"_id": doc.get("_id", None)},
+              replacement=doc,
+              upsert=True)
+          requests.append(request)
+          resp = client[db][coll].bulk_write(requests)
+          # set logger to debug level to log the response
+          
+          logger.debug(
+          "BulkWrite to MongoDB result in nModified:%d, nUpserted:%d, "
+          "nMatched:%d, Errors:%s" % (
+              resp.modified_count,
+              resp.upserted_count,
+              resp.matched_count,
+              resp.bulk_api_result.get("writeErrors"),
+          ))
+      return resp

Review Comment:
   Nope, getting rid of it since the logging is handled by the writeFn. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org