You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/09/29 16:12:59 UTC

[GitHub] [beam] AnandInguva commented on a diff in pull request #22400: [MongoDBio] custom writeFn for mongodb io python sdk

AnandInguva commented on code in PR #22400:
URL: https://github.com/apache/beam/pull/22400#discussion_r983738639


##########
sdks/python/.python-version:
##########
@@ -0,0 +1 @@
+3.7.13

Review Comment:
   this might be added accidentally?



##########
sdks/python/apache_beam/io/mongodbio.py:
##########
@@ -772,36 +789,46 @@ def display_data(self):
 
 
 class _MongoSink:
-  def __init__(self, uri=None, db=None, coll=None, extra_params=None):
+  def __init__(
+      self, uri=None, db=None, coll=None, extra_params=None, writeFn=None):
     if extra_params is None:
       extra_params = {}
     self.uri = uri
     self.db = db
     self.coll = coll
     self.spec = extra_params
     self.client = None
+    self.writeFn = writeFn
+    if writeFn is None:
+      self.writeFn = self._defaultWriteFn
+
+  @staticmethod
+  def _defaultWriteFn(client, db, coll, documents, logger):
+    """to gain control over the write process,
+    a user could for example, change ReplaceOne into UpdateOne
+    a user could implement their own WriteFn, using this as a template,
+    notice that the 'self' argument should be ommited
+    """
+    requests = []
+    for doc in documents:
+      request = ReplaceOne(
+          filter={"_id": doc.get("_id", None)}, replacement=doc, upsert=True)
+      requests.append(request)
+      resp = client[db][coll].bulk_write(requests)
+      # set logger to debug level to log the response
+      logger.debug(
+          "BulkWrite to MongoDB result in nModified:%d, nUpserted:%d, "
+          "nMatched:%d, Errors:%s" % (
+              resp.modified_count,
+              resp.upserted_count,
+              resp.matched_count,
+              resp.bulk_api_result.get("writeErrors"),
+          ))
 
   def write(self, documents):
     if self.client is None:
       self.client = MongoClient(host=self.uri, **self.spec)
-    requests = []
-    for doc in documents:
-      # match document based on _id field, if not found in current collection,
-      # insert new one, otherwise overwrite it.
-      requests.append(
-          ReplaceOne(
-              filter={"_id": doc.get("_id", None)},
-              replacement=doc,
-              upsert=True))
-    resp = self.client[self.db][self.coll].bulk_write(requests)
-    _LOGGER.debug(
-        "BulkWrite to MongoDB result in nModified:%d, nUpserted:%d, "
-        "nMatched:%d, Errors:%s" % (
-            resp.modified_count,
-            resp.upserted_count,
-            resp.matched_count,
-            resp.bulk_api_result.get("writeErrors"),
-        ))
+    self.writeFn(self.client, self.db, self.coll, documents, _LOGGER)

Review Comment:
   For the custom func, can we include in Doc string what callable expects as parameters? It would be easier to get errors if the positional args are different on the user side compared to here 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org