You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/06/02 03:12:48 UTC

[GitHub] [beam] pareshsarafmdb opened a new pull request #14927: MongoDBIO support for update within documents

pareshsarafmdb opened a new pull request #14927:
URL: https://github.com/apache/beam/pull/14927


   **Please** add a meaningful description for your change here
   Current MongoDBIO connector only supports inserts and replace in a collection. In general there will be requirements for updating an existing document - adding a new field, updating an existing field, pushing into an array or set etc. Added support for this. 
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   `ValidatesRunner` compliance status (on master branch)
   --------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Lang</th>
         <th>ULR</th>
         <th>Dataflow</th>
         <th>Flink</th>
         <th>Samza</th>
         <th>Spark</th>
         <th>Twister2</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Go</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon">
           </a>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
       <tr>
         <td>Java</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_ULR/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_ULR/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming/lastCompletedBuild/badge/icon?subject=V1+Streaming">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon?subject=V1+Java+11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2_Streaming/lastCompletedBuild/badge/icon?subject=V2+Streaming">
           </a><br>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon?subject=Java+8">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon?subject=Java+11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon?subject=Portable+Streaming">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Samza/lastCompletedBuild/badge/icon?subject=Portable">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon?subject=Structured+Streaming">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon">
           </a>
         </td>
       </tr>
       <tr>
         <td>Python</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon?subject=ValCont">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
       <tr>
         <td>XLang</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   Examples testing status on various runners
   --------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Lang</th>
         <th>ULR</th>
         <th>Dataflow</th>
         <th>Flink</th>
         <th>Samza</th>
         <th>Spark</th>
         <th>Twister2</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Go</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>Java</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Java11_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Java11_Cron/lastCompletedBuild/badge/icon?subject=V1+Java11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_Examples_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_Examples_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
         </td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>Python</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>XLang</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   Post-Commit SDK/Transform Integration Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Go</th>
         <th>Java</th>
         <th>Python</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon?subject=3.6">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon?subject=3.7">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon?subject=3.8">
           </a>
         </td>
       </tr>
     </tbody>
   </table>
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>---</th>
         <th>Java</th>
         <th>Python</th>
         <th>Go</th>
         <th>Website</th>
         <th>Whitespace</th>
         <th>Typescript</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Non-portable</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon">
           </a><br>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon?subject=Tests">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon?subject=Lint">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon?subject=Docker">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon?subject=Docs">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
       </tr>
       <tr>
         <td>Portable</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_GoPortable_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_GoPortable_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #14927: [BEAM-12400] MongoDBIO support for update within documents

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #14927:
URL: https://github.com/apache/beam/pull/14927#issuecomment-908600217


   test issues are unrelated (at least Java Examples failures). This is on me to review. Sorry about the delay again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pareshsarafmdb commented on a change in pull request #14927: [BEAM-12400] MongoDBIO support for update within documents

Posted by GitBox <gi...@apache.org>.
pareshsarafmdb commented on a change in pull request #14927:
URL: https://github.com/apache/beam/pull/14927#discussion_r663318281



##########
File path: sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
##########
@@ -910,15 +973,24 @@ public void startBundle() {
       public void processElement(ProcessContext ctx) {
         // Need to copy the document because mongoCollection.insertMany() will mutate it
         // before inserting (will assign an id).
+
         batch.add(new Document(ctx.element()));
         if (batch.size() >= spec.batchSize()) {
-          flush();
+          if (spec.isUpdate()) {

Review comment:
       @aromanenko-dev @pabloem Have checked in the changes. We have an UpdateConfiguration object where all the update related details are wrapped. Sample below:
   MongoDbIO.write().withUri(options.getMongoDBUri())
                           .withDatabase(options.getDatabase())
                           .withCollection(options.getCollection()).withUpdateConfiguration(
                                   UpdateConfiguration.create().withUpdateKey("accId").withUpdateFields(
                                           UpdateField.of("$set", "category", "category"),
                                           UpdateField.of("$set","balance", "balance"),
                                           UpdateField.of("$push", "transactions"))
    Last update which is a push does update the dest field with full input document. Please review and let me know.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on a change in pull request #14927: [BEAM-12400] MongoDBIO support for update within documents

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #14927:
URL: https://github.com/apache/beam/pull/14927#discussion_r686224696



##########
File path: sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/UpdateField.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.mongodb;
+
+import java.io.Serializable;
+
+public class UpdateField implements Serializable {

Review comment:
       this class could also be an AutoValue class, maybe?

##########
File path: sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/UpdateField.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.mongodb;
+
+import java.io.Serializable;
+
+public class UpdateField implements Serializable {
+
+  private String updateOperator;
+
+  private String sourceField;
+
+  private String destField;

Review comment:
       these can be final, I suppose?

##########
File path: sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/UpdateField.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.mongodb;
+
+import java.io.Serializable;
+
+public class UpdateField implements Serializable {
+
+  private String updateOperator;
+
+  private String sourceField;
+
+  private String destField;
+
+  public UpdateField(String updateOperator, String sourceField, String destField) {
+    this.updateOperator = updateOperator;
+    this.sourceField = sourceField;
+    this.destField = destField;
+  }
+
+  /** for updating field by field. */
+  public static UpdateField of(String updateOperator, String sourceField, String destField) {
+    return new UpdateField(updateOperator, sourceField, destField);
+  }
+
+  /** for updating with entire input document. */
+  public static UpdateField of(String updateOperator, String destField) {
+    return new UpdateField(updateOperator, null, destField);

Review comment:
       is this used whenever we create a new field? Or whenever the source and dest fields have the same name?

##########
File path: sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/UpdateField.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.mongodb;
+
+import java.io.Serializable;
+
+public class UpdateField implements Serializable {
+
+  private String updateOperator;
+
+  private String sourceField;
+
+  private String destField;
+
+  public UpdateField(String updateOperator, String sourceField, String destField) {
+    this.updateOperator = updateOperator;
+    this.sourceField = sourceField;
+    this.destField = destField;
+  }
+
+  /** for updating field by field. */
+  public static UpdateField of(String updateOperator, String sourceField, String destField) {
+    return new UpdateField(updateOperator, sourceField, destField);
+  }
+
+  /** for updating with entire input document. */
+  public static UpdateField of(String updateOperator, String destField) {
+    return new UpdateField(updateOperator, null, destField);

Review comment:
       I wonder if it makes sense to add an option for whenever source and destination field have the same name : )




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pareshsarafmdb commented on a change in pull request #14927: [BEAM-12400] MongoDBIO support for update within documents

Posted by GitBox <gi...@apache.org>.
pareshsarafmdb commented on a change in pull request #14927:
URL: https://github.com/apache/beam/pull/14927#discussion_r689446518



##########
File path: sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/UpdateField.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.mongodb;
+
+import java.io.Serializable;
+
+public class UpdateField implements Serializable {
+
+  private String updateOperator;
+
+  private String sourceField;
+
+  private String destField;
+
+  public UpdateField(String updateOperator, String sourceField, String destField) {
+    this.updateOperator = updateOperator;
+    this.sourceField = sourceField;
+    this.destField = destField;
+  }
+
+  /** for updating field by field. */
+  public static UpdateField of(String updateOperator, String sourceField, String destField) {
+    return new UpdateField(updateOperator, sourceField, destField);
+  }
+
+  /** for updating with entire input document. */
+  public static UpdateField of(String updateOperator, String destField) {
+    return new UpdateField(updateOperator, null, destField);

Review comment:
       This is used when we need to update full source document into destination document. (Not updating specific fields). Whenever we have to update specific fields we need to specify which field in the source has to be updated to which field in dest document. Do you think there can be a better way?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on a change in pull request #14927: [BEAM-12400] MongoDBIO support for update within documents

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #14927:
URL: https://github.com/apache/beam/pull/14927#discussion_r652150217



##########
File path: sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
##########
@@ -856,6 +898,22 @@ public Write withBatchSize(long batchSize) {
       return builder().setBatchSize(batchSize).build();
     }
 
+    public Write withIsUpdate(boolean isUpdate) {
+      return builder().setIsUpdate(isUpdate).build();
+    }
+
+    public Write withUpdateKey(String updateKey) {
+      return builder().setUpdateKey(updateKey).build();
+    }
+
+    public Write withUpdateOperator(String updateOperator) {
+      return builder().setUpdateOperator(updateOperator).build();
+    }
+
+    public Write withUpdateField(String updateField) {
+      return builder().setUpdateField(updateField).build();
+    }
+

Review comment:
       Please also add javadoc to each public method

##########
File path: sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
##########
@@ -877,6 +935,11 @@ public void populateDisplayData(DisplayData.Builder builder) {
       builder.add(DisplayData.item("database", database()));
       builder.add(DisplayData.item("collection", collection()));
       builder.add(DisplayData.item("batchSize", batchSize()));
+      // builder.add(DisplayData.item("isUpdate", isUpdate()));
+      // builder.add(DisplayData.item("updateKey", updateKey()));
+      // builder.add(DisplayData.item("updateOperator", updateOperator()));
+      // builder.add(Data.item("updateOptions", updateOptions()));
+      // builder.add(DisplayData.item("updateField", updateField()));

Review comment:
       Uncoment these? Maybe check if they're not null and uncomment I suppose.

##########
File path: sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
##########
@@ -106,6 +109,28 @@
  *     .withNumSplits(30))
  *
  * }</pre>
+ *
+ * *
+ *
+ * <p>To configure a MongoDB sink and update, you must specify a connection {@code URI}, a {@code
+ * Database} * name, a {@code Collection} name. It matches the key with _id in target collection.
+ * For instance: * *
+ *
+ * <pre>{@code
+ * * pipeline
+ * *   .apply(...)
+ * *   .apply(MongoDbIO.write()
+ * *     .withUri("mongodb://localhost:27017")
+ * *     .withDatabase("my-database")
+ * *     .withCollection("my-collection")
+ * *     .withIsUpdate(true)
+ * *     .withUpdateKey("key-to-match")
+ * *     .withUpdateField("field-to-update")
+ * *     .withUpdateOperator("$set")
+ * *     .withNumSplits(30))

Review comment:
       In this case, only one field can be updated. Is that correct? would it make sense to support updating more than one field?
   I see that mongodb works with pairs of `updateOperator`+`updateField`. I wonder if it makes sense to support something like that instead?
   
   e.g.:
   
   ```
     pipeline
       .apply(...)
       .apply(MongoDbIO.write()
         .withUri("mongodb://localhost:27017")
         .withDatabase("my-database")
         .withCollection("my-collection")
         .withIsUpdate(true)
         .withUpdateKey("key-to-match")
         .withUpdateFields(
                    "$set", "field1",
                    "$currentDate", "datefield1",
                    "$min", "special-minimum-field1")
         .withNumSplits(30))
   ```
   
   or perhaps something like this:
   
   ```
         .withUpdateFields(
                    UpdateField.of("$set", "field1"),
                    UpdateField.of("$currentDate", "datefield1"),
                    UpdateField.of("$min", "special-minimum-field1")))
   ```
   
   I also think it may be helpful to support  _upsert_ operations. WDYT?
   
   I don't want to make this too complex - but it feels like these options could be supported easily, right?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aromanenko-dev commented on pull request #14927: [BEAM-12400] MongoDBIO support for update within documents

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on pull request #14927:
URL: https://github.com/apache/beam/pull/14927#issuecomment-895957926


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pareshsarafmdb commented on a change in pull request #14927: [BEAM-12400] MongoDBIO support for update within documents

Posted by GitBox <gi...@apache.org>.
pareshsarafmdb commented on a change in pull request #14927:
URL: https://github.com/apache/beam/pull/14927#discussion_r702419902



##########
File path: sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
##########
@@ -856,6 +898,22 @@ public Write withBatchSize(long batchSize) {
       return builder().setBatchSize(batchSize).build();
     }
 
+    public Write withIsUpdate(boolean isUpdate) {
+      return builder().setIsUpdate(isUpdate).build();
+    }
+
+    public Write withUpdateKey(String updateKey) {
+      return builder().setUpdateKey(updateKey).build();
+    }
+
+    public Write withUpdateOperator(String updateOperator) {
+      return builder().setUpdateOperator(updateOperator).build();
+    }
+
+    public Write withUpdateField(String updateField) {
+      return builder().setUpdateField(updateField).build();
+    }
+

Review comment:
       > @pareshsarafmdb can you squash your commits? then we'll merge. Thanks!
   
   Hi @pabloem As my changes are on master facing challenges is squashing commits. Will you be able to squash and merge by any chance. If not will try looking for alternatives.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on a change in pull request #14927: [BEAM-12400] MongoDBIO support for update within documents

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #14927:
URL: https://github.com/apache/beam/pull/14927#discussion_r655668926



##########
File path: sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
##########
@@ -106,6 +109,28 @@
  *     .withNumSplits(30))
  *
  * }</pre>
+ *
+ * *
+ *
+ * <p>To configure a MongoDB sink and update, you must specify a connection {@code URI}, a {@code
+ * Database} * name, a {@code Collection} name. It matches the key with _id in target collection.
+ * For instance: * *
+ *
+ * <pre>{@code
+ * * pipeline
+ * *   .apply(...)
+ * *   .apply(MongoDbIO.write()
+ * *     .withUri("mongodb://localhost:27017")
+ * *     .withDatabase("my-database")
+ * *     .withCollection("my-collection")
+ * *     .withIsUpdate(true)
+ * *     .withUpdateKey("key-to-match")
+ * *     .withUpdateField("field-to-update")
+ * *     .withUpdateOperator("$set")
+ * *     .withNumSplits(30))

Review comment:
       This makes sense to me! : )
   
   I suppose when we update the full document, then the field names must be the same?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pareshsarafmdb commented on pull request #14927: [BEAM-12400] MongoDBIO support for update within documents

Posted by GitBox <gi...@apache.org>.
pareshsarafmdb commented on pull request #14927:
URL: https://github.com/apache/beam/pull/14927#issuecomment-861405735


   > can you improve the javadoc as well? It would be helpful if I could see what an example of usage of this transform looksl ike
   
   Have added javadoc related to update. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #14927: [BEAM-12400] MongoDBIO support for update within documents

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #14927:
URL: https://github.com/apache/beam/pull/14927#issuecomment-913338378


   squashed using Github's feature


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on a change in pull request #14927: [BEAM-12400] MongoDBIO support for update within documents

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #14927:
URL: https://github.com/apache/beam/pull/14927#discussion_r699560818



##########
File path: sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/UpdateField.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.mongodb;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+@Experimental(Kind.SOURCE_SINK)
+@AutoValue
+public abstract class UpdateField implements Serializable {
+
+  abstract @Nullable String updateOperator();
+
+  abstract @Nullable String sourceField();
+
+  abstract @Nullable String destField();
+
+  private static Builder builder() {
+    return new AutoValue_UpdateField.Builder().setSourceField(null);
+  }
+
+  abstract UpdateField.Builder toBuilder();
+
+  public static UpdateField create() {
+    return builder().build();
+  }

Review comment:
       This initial constructor creates an `UpdateField` object that is not valid, right? Doesn't it make sense to make this method private, and then make `fullUpdate` and `fieldUpdate` static constructor methods?

##########
File path: sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/UpdateConfiguration.java
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.mongodb;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/** Builds a MongoDB UpdateConfiguration object. */
+@Experimental(Kind.SOURCE_SINK)
+@AutoValue
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+public abstract class UpdateConfiguration implements Serializable {
+
+  abstract @Nullable String updateKey();

Review comment:
       can we have more than one field as a key?

##########
File path: sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
##########
@@ -106,6 +112,27 @@
  *     .withNumSplits(30))
  *
  * }</pre>
+ *
+ * *
+ *
+ * <p>To configure a MongoDB sink and update, you must specify a connection {@code URI}, a {@code
+ * Database} * name, a {@code Collection} name. It matches the key with _id in target collection.
+ * For instance: * *
+ *
+ * <pre>{@code
+ * * pipeline
+ * *   .apply(...)
+ * *   .apply(MongoDbIO.write()
+ * *     .withUri("mongodb://localhost:27017")
+ * *     .withDatabase("my-database")
+ * *     .withCollection("my-collection")
+ * *     .withUpdateConfiguration(UpdateConfiguration.create().withUpdateKey("key1")
+ * *     .withUpdateFields(UpdateField.of("$set", "source-field1", "dest-field1"),
+ * *                       UpdateField.of("$set","source-field2", "dest-field2"),
+ * *                       //pushes entire input doc to the dest field
+ * *                        UpdateField.of("$push", "dest-field3") )));

Review comment:
       we're going to have to update the Javadoc herer to match the final API




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pareshsarafmdb commented on a change in pull request #14927: [BEAM-12400] MongoDBIO support for update within documents

Posted by GitBox <gi...@apache.org>.
pareshsarafmdb commented on a change in pull request #14927:
URL: https://github.com/apache/beam/pull/14927#discussion_r689483950



##########
File path: sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/UpdateField.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.mongodb;
+
+import java.io.Serializable;
+
+public class UpdateField implements Serializable {

Review comment:
       @pabloem Have changed it to auto value class which has two methods - fieldUpdate and fullUpdate. Example below:
   MongoDbIO.write()
                   .withUri("mongodb://localhost:" + port)
                   .withDatabase(DATABASE)
                   .withCollection(collectionName)
                   .withUpdateConfiguration(
                       UpdateConfiguration.create()
                           .withUpdateKey("id")
                           .withUpdateFields(
                               UpdateField.create().fieldUpdate("$set", "scientist", "scientist"),
                               UpdateField.create().fieldUpdate("$set", "country", "country"))));
   Please review and let me know. @aromanenko-dev Please trigger the build - Error should be hopefully gone now. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aaltay commented on a change in pull request #14927: [BEAM-12400] MongoDBIO support for update within documents

Posted by GitBox <gi...@apache.org>.
aaltay commented on a change in pull request #14927:
URL: https://github.com/apache/beam/pull/14927#discussion_r675288003



##########
File path: sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
##########
@@ -910,15 +973,24 @@ public void startBundle() {
       public void processElement(ProcessContext ctx) {
         // Need to copy the document because mongoCollection.insertMany() will mutate it
         // before inserting (will assign an id).
+
         batch.add(new Document(ctx.element()));
         if (batch.size() >= spec.batchSize()) {
-          flush();
+          if (spec.isUpdate()) {

Review comment:
       (Pablo is on vacation until August, please ping again in early August.)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #14927: [BEAM-12400] MongoDBIO support for update within documents

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #14927:
URL: https://github.com/apache/beam/pull/14927#issuecomment-859794415


   can you improve the javadoc as well? It would be helpful if I could see what an example of usage of this transform looksl ike


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pareshsarafmdb commented on a change in pull request #14927: [BEAM-12400] MongoDBIO support for update within documents

Posted by GitBox <gi...@apache.org>.
pareshsarafmdb commented on a change in pull request #14927:
URL: https://github.com/apache/beam/pull/14927#discussion_r699865890



##########
File path: sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/UpdateConfiguration.java
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.mongodb;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/** Builds a MongoDB UpdateConfiguration object. */
+@Experimental(Kind.SOURCE_SINK)
+@AutoValue
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+public abstract class UpdateConfiguration implements Serializable {
+
+  abstract @Nullable String updateKey();

Review comment:
       Right now only one key which will be matched to _id field in mongodb. We can enhance in further releases is what I thought. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pareshsarafmdb commented on a change in pull request #14927: [BEAM-12400] MongoDBIO support for update within documents

Posted by GitBox <gi...@apache.org>.
pareshsarafmdb commented on a change in pull request #14927:
URL: https://github.com/apache/beam/pull/14927#discussion_r662054322



##########
File path: sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
##########
@@ -106,6 +109,28 @@
  *     .withNumSplits(30))
  *
  * }</pre>
+ *
+ * *
+ *
+ * <p>To configure a MongoDB sink and update, you must specify a connection {@code URI}, a {@code
+ * Database} * name, a {@code Collection} name. It matches the key with _id in target collection.
+ * For instance: * *
+ *
+ * <pre>{@code
+ * * pipeline
+ * *   .apply(...)
+ * *   .apply(MongoDbIO.write()
+ * *     .withUri("mongodb://localhost:27017")
+ * *     .withDatabase("my-database")
+ * *     .withCollection("my-collection")
+ * *     .withIsUpdate(true)
+ * *     .withUpdateKey("key-to-match")
+ * *     .withUpdateField("field-to-update")
+ * *     .withUpdateOperator("$set")
+ * *     .withNumSplits(30))

Review comment:
       Yeah. Have implemented most of it. Testing it now. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aaltay commented on pull request #14927: [BEAM-12400] MongoDBIO support for update within documents

Posted by GitBox <gi...@apache.org>.
aaltay commented on pull request #14927:
URL: https://github.com/apache/beam/pull/14927#issuecomment-906833409


   @pareshsarafmdb - Could you look at the test issues?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pareshsarafmdb commented on pull request #14927: [BEAM-12400] MongoDBIO support for update within documents

Posted by GitBox <gi...@apache.org>.
pareshsarafmdb commented on pull request #14927:
URL: https://github.com/apache/beam/pull/14927#issuecomment-895905883


   > > @pareshsarafmdb No worries, it's just a command to run `Java PreCommit` job. Actually, it fails (you can see the results by clicking on "Details" against `Java ("Run Java PreCommit")` check):
   > > ```
   > > 20:47:11 > Task :sdks:java:io:mongodb:compileJava
   > > 20:47:11 /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_Phrase/src/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/UpdateField.java:43: error: [argument.type.incompatible] incompatible argument for parameter sourceField of UpdateField.
   > > 20:47:11     return new UpdateField(updateOperator, null, destField);
   > > 20:47:11                                            ^
   > > 20:47:11   found   : null (NullType)
   > > 20:47:11   required: @Initialized @NonNull String
   > > ```
   > > 
   > > 
   > >     
   > >       
   > >     
   > > 
   > >       
   > >     
   > > 
   > >     
   > >   
   > > I'd recommend to run this command `./gradlew :sdks:java:io:mongodb:check` locally before pushing your changes to make sure that everything is fine.
   > 
   > I have done it and verified it again. No issues with the build in local.
   > BUILD SUCCESSFUL in 3m 4s
   > 58 actionable tasks: 58 executed
   
   And If i see the console log the error doesn't seem to be related to changes I made. Can you pls check. @aromanenko-dev 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pareshsarafmdb commented on a change in pull request #14927: [BEAM-12400] MongoDBIO support for update within documents

Posted by GitBox <gi...@apache.org>.
pareshsarafmdb commented on a change in pull request #14927:
URL: https://github.com/apache/beam/pull/14927#discussion_r654112756



##########
File path: sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
##########
@@ -106,6 +109,28 @@
  *     .withNumSplits(30))
  *
  * }</pre>
+ *
+ * *
+ *
+ * <p>To configure a MongoDB sink and update, you must specify a connection {@code URI}, a {@code
+ * Database} * name, a {@code Collection} name. It matches the key with _id in target collection.
+ * For instance: * *
+ *
+ * <pre>{@code
+ * * pipeline
+ * *   .apply(...)
+ * *   .apply(MongoDbIO.write()
+ * *     .withUri("mongodb://localhost:27017")
+ * *     .withDatabase("my-database")
+ * *     .withCollection("my-collection")
+ * *     .withIsUpdate(true)
+ * *     .withUpdateKey("key-to-match")
+ * *     .withUpdateField("field-to-update")
+ * *     .withUpdateOperator("$set")
+ * *     .withNumSplits(30))

Review comment:
       Thanks for the clarification! This makes sense. But in real time Sync (CDC) scenario mostly it will be rows coming in and being added to an array or set within mongoDB. For eg say we have a customer collection. New transactions made by the customer is captured and it should be added to the transactions array within customer document. So we definitely need an option to update with full document. So I think we can add both of these options - updating all the fields together and one by one as well. That will be more feature rich as well. What say ?

##########
File path: sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
##########
@@ -766,6 +800,14 @@ private MongoClient createClient(Read spec) {
 
       abstract Builder setBatchSize(long batchSize);
 
+      abstract Builder setIsUpdate(boolean isUpdate);

Review comment:
       Thank you for the feedback. This should make it easy and clean. I will work on it. 

##########
File path: sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
##########
@@ -106,6 +109,28 @@
  *     .withNumSplits(30))
  *
  * }</pre>
+ *
+ * *
+ *
+ * <p>To configure a MongoDB sink and update, you must specify a connection {@code URI}, a {@code
+ * Database} * name, a {@code Collection} name. It matches the key with _id in target collection.
+ * For instance: * *
+ *
+ * <pre>{@code
+ * * pipeline
+ * *   .apply(...)
+ * *   .apply(MongoDbIO.write()
+ * *     .withUri("mongodb://localhost:27017")
+ * *     .withDatabase("my-database")
+ * *     .withCollection("my-collection")
+ * *     .withIsUpdate(true)
+ * *     .withUpdateKey("key-to-match")
+ * *     .withUpdateField("field-to-update")
+ * *     .withUpdateOperator("$set")
+ * *     .withNumSplits(30))

Review comment:
       and other challenge - right now in your example we are assuming that the field/column name in the source is same as field name to be updated in the target. It can be different in most of the cases. So it might be like below:
   .withUpdateKey("name")
         .withUpdateFields(
                    UpdateField.of("$set", "status", "dest-status-field-in-mdb"),
                    UpdateField.of("$currentDate", "lastUpdated", "dest-lastUpdated-field-in-mdb"),
                    UpdateField.of("$set", "age", "dest-age-field-in-mdb")))




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pareshsarafmdb commented on a change in pull request #14927: [BEAM-12400] MongoDBIO support for update within documents

Posted by GitBox <gi...@apache.org>.
pareshsarafmdb commented on a change in pull request #14927:
URL: https://github.com/apache/beam/pull/14927#discussion_r699846274



##########
File path: sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/UpdateField.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.mongodb;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+@Experimental(Kind.SOURCE_SINK)
+@AutoValue
+public abstract class UpdateField implements Serializable {
+
+  abstract @Nullable String updateOperator();
+
+  abstract @Nullable String sourceField();
+
+  abstract @Nullable String destField();
+
+  private static Builder builder() {
+    return new AutoValue_UpdateField.Builder().setSourceField(null);
+  }
+
+  abstract UpdateField.Builder toBuilder();
+
+  public static UpdateField create() {
+    return builder().build();
+  }
+
+  @AutoValue.Builder
+  abstract static class Builder {
+    abstract UpdateField.Builder setUpdateOperator(@Nullable String updateOperator);
+
+    abstract UpdateField.Builder setSourceField(@Nullable String sourceField);
+
+    abstract UpdateField.Builder setDestField(@Nullable String destField);
+
+    abstract UpdateField build();
+  }
+
+  /** Sets the limit of documents to find. */
+  public UpdateField fullUpdate(String updateOperator, String destField) {
+    return toBuilder().setUpdateOperator(updateOperator).setDestField(destField).build();
+  }

Review comment:
       Yes thats right. Thats intended. 

##########
File path: sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/UpdateConfiguration.java
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.mongodb;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/** Builds a MongoDB UpdateConfiguration object. */
+@Experimental(Kind.SOURCE_SINK)
+@AutoValue
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+public abstract class UpdateConfiguration implements Serializable {
+
+  abstract @Nullable String updateKey();

Review comment:
       Right now only one key which will be matched to _id field in mongodb. We can enhance in further releases is what I thought. 

##########
File path: sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
##########
@@ -106,6 +112,27 @@
  *     .withNumSplits(30))
  *
  * }</pre>
+ *
+ * *
+ *
+ * <p>To configure a MongoDB sink and update, you must specify a connection {@code URI}, a {@code
+ * Database} * name, a {@code Collection} name. It matches the key with _id in target collection.
+ * For instance: * *
+ *
+ * <pre>{@code
+ * * pipeline
+ * *   .apply(...)
+ * *   .apply(MongoDbIO.write()
+ * *     .withUri("mongodb://localhost:27017")
+ * *     .withDatabase("my-database")
+ * *     .withCollection("my-collection")
+ * *     .withUpdateConfiguration(UpdateConfiguration.create().withUpdateKey("key1")
+ * *     .withUpdateFields(UpdateField.of("$set", "source-field1", "dest-field1"),
+ * *                       UpdateField.of("$set","source-field2", "dest-field2"),
+ * *                       //pushes entire input doc to the dest field
+ * *                        UpdateField.of("$push", "dest-field3") )));

Review comment:
       I have done it. 

##########
File path: sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/UpdateField.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.mongodb;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+@Experimental(Kind.SOURCE_SINK)
+@AutoValue
+public abstract class UpdateField implements Serializable {
+
+  abstract @Nullable String updateOperator();
+
+  abstract @Nullable String sourceField();
+
+  abstract @Nullable String destField();
+
+  private static Builder builder() {
+    return new AutoValue_UpdateField.Builder().setSourceField(null);
+  }
+
+  abstract UpdateField.Builder toBuilder();
+
+  public static UpdateField create() {
+    return builder().build();
+  }

Review comment:
       Have done the change. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #14927: [BEAM-12400] MongoDBIO support for update within documents

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #14927:
URL: https://github.com/apache/beam/pull/14927#issuecomment-913312643


   Run Java PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #14927: [BEAM-12400] MongoDBIO support for update within documents

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #14927:
URL: https://github.com/apache/beam/pull/14927#issuecomment-910523773


   okay this LGTM. Happy to merge @aromanenko-dev any other comments? : )


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on a change in pull request #14927: [BEAM-12400] MongoDBIO support for update within documents

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #14927:
URL: https://github.com/apache/beam/pull/14927#discussion_r699560818



##########
File path: sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/UpdateField.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.mongodb;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+@Experimental(Kind.SOURCE_SINK)
+@AutoValue
+public abstract class UpdateField implements Serializable {
+
+  abstract @Nullable String updateOperator();
+
+  abstract @Nullable String sourceField();
+
+  abstract @Nullable String destField();
+
+  private static Builder builder() {
+    return new AutoValue_UpdateField.Builder().setSourceField(null);
+  }
+
+  abstract UpdateField.Builder toBuilder();
+
+  public static UpdateField create() {
+    return builder().build();
+  }

Review comment:
       This initial constructor creates an `UpdateField` object that is not valid, right? Doesn't it make sense to make this method private, and then make `fullUpdate` and `fieldUpdate` static constructor methods?

##########
File path: sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/UpdateConfiguration.java
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.mongodb;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/** Builds a MongoDB UpdateConfiguration object. */
+@Experimental(Kind.SOURCE_SINK)
+@AutoValue
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+public abstract class UpdateConfiguration implements Serializable {
+
+  abstract @Nullable String updateKey();

Review comment:
       can we have more than one field as a key?

##########
File path: sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
##########
@@ -106,6 +112,27 @@
  *     .withNumSplits(30))
  *
  * }</pre>
+ *
+ * *
+ *
+ * <p>To configure a MongoDB sink and update, you must specify a connection {@code URI}, a {@code
+ * Database} * name, a {@code Collection} name. It matches the key with _id in target collection.
+ * For instance: * *
+ *
+ * <pre>{@code
+ * * pipeline
+ * *   .apply(...)
+ * *   .apply(MongoDbIO.write()
+ * *     .withUri("mongodb://localhost:27017")
+ * *     .withDatabase("my-database")
+ * *     .withCollection("my-collection")
+ * *     .withUpdateConfiguration(UpdateConfiguration.create().withUpdateKey("key1")
+ * *     .withUpdateFields(UpdateField.of("$set", "source-field1", "dest-field1"),
+ * *                       UpdateField.of("$set","source-field2", "dest-field2"),
+ * *                       //pushes entire input doc to the dest field
+ * *                        UpdateField.of("$push", "dest-field3") )));

Review comment:
       we're going to have to update the Javadoc herer to match the final API

##########
File path: sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/UpdateField.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.mongodb;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+@Experimental(Kind.SOURCE_SINK)
+@AutoValue
+public abstract class UpdateField implements Serializable {
+
+  abstract @Nullable String updateOperator();
+
+  abstract @Nullable String sourceField();
+
+  abstract @Nullable String destField();
+
+  private static Builder builder() {
+    return new AutoValue_UpdateField.Builder().setSourceField(null);
+  }
+
+  abstract UpdateField.Builder toBuilder();
+
+  public static UpdateField create() {
+    return builder().build();
+  }
+
+  @AutoValue.Builder
+  abstract static class Builder {
+    abstract UpdateField.Builder setUpdateOperator(@Nullable String updateOperator);
+
+    abstract UpdateField.Builder setSourceField(@Nullable String sourceField);
+
+    abstract UpdateField.Builder setDestField(@Nullable String destField);
+
+    abstract UpdateField build();
+  }
+
+  /** Sets the limit of documents to find. */
+  public UpdateField fullUpdate(String updateOperator, String destField) {
+    return toBuilder().setUpdateOperator(updateOperator).setDestField(destField).build();
+  }

Review comment:
       In this case, we're inserting the full record into one field of the destination, right?
   
   e.g.:
   original
   ```
   {
     "key": "mykey1",
     "value1": "myvalue1",
     "value2": ["1", "2"],
     "value3": "thisvalue"
   }
   ```
   
   newvalue
   ```
   {
     "key": "mykey1",
     "value1": "myvalue1UPD",
     "value2": ["1", "2", "3", "4"],
     "value3": "thisvalueISUPDATED"
   }
   ```
   
   Given configuration `UpdateField.fullUpdate("$push", "value4")`:
   result
   ```
   {
     "key": "mykey1",
     "value1": "myvalue1",
     "value2": ["1", "2"],
     "value3": "thisvalue"
     "value4": {
       "key": "mykey1",
       "value1": "myvalue1UPD",
       "value2": ["1", "2", "3", "4"],
       "value3": "thisvalueISUPDATED"
     }
   }
   ```
   
   Is this correct / intended?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pareshsarafmdb commented on a change in pull request #14927: [BEAM-12400] MongoDBIO support for update within documents

Posted by GitBox <gi...@apache.org>.
pareshsarafmdb commented on a change in pull request #14927:
URL: https://github.com/apache/beam/pull/14927#discussion_r652322951



##########
File path: sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
##########
@@ -106,6 +109,28 @@
  *     .withNumSplits(30))
  *
  * }</pre>
+ *
+ * *
+ *
+ * <p>To configure a MongoDB sink and update, you must specify a connection {@code URI}, a {@code
+ * Database} * name, a {@code Collection} name. It matches the key with _id in target collection.
+ * For instance: * *
+ *
+ * <pre>{@code
+ * * pipeline
+ * *   .apply(...)
+ * *   .apply(MongoDbIO.write()
+ * *     .withUri("mongodb://localhost:27017")
+ * *     .withDatabase("my-database")
+ * *     .withCollection("my-collection")
+ * *     .withIsUpdate(true)
+ * *     .withUpdateKey("key-to-match")
+ * *     .withUpdateField("field-to-update")
+ * *     .withUpdateOperator("$set")
+ * *     .withNumSplits(30))

Review comment:
       Hi Pablo. Thanks for the inputs. I thought the same. But I am confused how the actual values come in?  With the single field update whatever value comes in from the pipeline will be set as the value for the field. Now if we have to update for multiple fields how it can be done ? I am not pretty sure how it works in beam. Hope my question is clear. 
   
   Upserts functionality can definitely be added.  




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #14927: [BEAM-12400] MongoDBIO support for update within documents

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #14927:
URL: https://github.com/apache/beam/pull/14927#issuecomment-912053679


   @pareshsarafmdb can you squash your commits? then we'll merge. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #14927: [BEAM-12400] MongoDBIO support for update within documents

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #14927:
URL: https://github.com/apache/beam/pull/14927#issuecomment-896229480


   the precommit seems to be failing due to a static check issue:
   
   ![image](https://user-images.githubusercontent.com/1301740/128917476-d55ee8bc-2a4c-4335-b90a-735ea826cd71.png)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pareshsarafmdb commented on pull request #14927: [BEAM-12400] MongoDBIO support for update within documents

Posted by GitBox <gi...@apache.org>.
pareshsarafmdb commented on pull request #14927:
URL: https://github.com/apache/beam/pull/14927#issuecomment-909888653


   > > Just added minor stylistic comments.
   > > I still wonder: What happens when users want to update the whole old record into the whole new record?
   > > Finally, I have one more question about a later improvement to consider for this IO - is it worth thinking later about passing a lambda to choose the destination collection?
   > 
   > @pabloem If I understand what if user wants to update with entire source document into a destination document - In that case you have a overloaded method which just takes update operator and dest field. This does full document update. Did I make sense?
   
   Yes thats right,


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pareshsarafmdb commented on pull request #14927: [BEAM-12400] MongoDBIO support for update within documents

Posted by GitBox <gi...@apache.org>.
pareshsarafmdb commented on pull request #14927:
URL: https://github.com/apache/beam/pull/14927#issuecomment-895717902


   > Run Java PreCommit
   
   Can you please help me how to do this? Thanks much


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on a change in pull request #14927: [BEAM-12400] MongoDBIO support for update within documents

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #14927:
URL: https://github.com/apache/beam/pull/14927#discussion_r699563891



##########
File path: sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/UpdateField.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.mongodb;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+@Experimental(Kind.SOURCE_SINK)
+@AutoValue
+public abstract class UpdateField implements Serializable {
+
+  abstract @Nullable String updateOperator();
+
+  abstract @Nullable String sourceField();
+
+  abstract @Nullable String destField();
+
+  private static Builder builder() {
+    return new AutoValue_UpdateField.Builder().setSourceField(null);
+  }
+
+  abstract UpdateField.Builder toBuilder();
+
+  public static UpdateField create() {
+    return builder().build();
+  }
+
+  @AutoValue.Builder
+  abstract static class Builder {
+    abstract UpdateField.Builder setUpdateOperator(@Nullable String updateOperator);
+
+    abstract UpdateField.Builder setSourceField(@Nullable String sourceField);
+
+    abstract UpdateField.Builder setDestField(@Nullable String destField);
+
+    abstract UpdateField build();
+  }
+
+  /** Sets the limit of documents to find. */
+  public UpdateField fullUpdate(String updateOperator, String destField) {
+    return toBuilder().setUpdateOperator(updateOperator).setDestField(destField).build();
+  }

Review comment:
       In this case, we're inserting the full record into one field of the destination, right?
   
   e.g.:
   original
   ```
   {
     "key": "mykey1",
     "value1": "myvalue1",
     "value2": ["1", "2"],
     "value3": "thisvalue"
   }
   ```
   
   newvalue
   ```
   {
     "key": "mykey1",
     "value1": "myvalue1UPD",
     "value2": ["1", "2", "3", "4"],
     "value3": "thisvalueISUPDATED"
   }
   ```
   
   Given configuration `UpdateField.fullUpdate("$push", "value4")`:
   result
   ```
   {
     "key": "mykey1",
     "value1": "myvalue1",
     "value2": ["1", "2"],
     "value3": "thisvalue"
     "value4": {
       "key": "mykey1",
       "value1": "myvalue1UPD",
       "value2": ["1", "2", "3", "4"],
       "value3": "thisvalueISUPDATED"
     }
   }
   ```
   
   Is this correct / intended?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #14927: [BEAM-12400] MongoDBIO support for update within documents

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #14927:
URL: https://github.com/apache/beam/pull/14927#issuecomment-895453693






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pareshsarafmdb commented on pull request #14927: [BEAM-12400] MongoDBIO support for update within documents

Posted by GitBox <gi...@apache.org>.
pareshsarafmdb commented on pull request #14927:
URL: https://github.com/apache/beam/pull/14927#issuecomment-899427042


   > Just added minor stylistic comments.
   > 
   > I still wonder: What happens when users want to update the whole old record into the whole new record?
   > 
   > Finally, I have one more question about a later improvement to consider for this IO - is it worth thinking later about passing a lambda to choose the destination collection?
   
   @pabloem If I understand what if user wants to update with entire source document into a destination document - In that case you have a overloaded method which just takes update operator and dest field. This does full document update. Did I make sense?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aromanenko-dev commented on pull request #14927: [BEAM-12400] MongoDBIO support for update within documents

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on pull request #14927:
URL: https://github.com/apache/beam/pull/14927#issuecomment-911606245


   Thanks, I'm fine with code changes. LGTM.
   
   However, before merging, please, squash all commits and avoid the `merge` commits - please, use `rebase` instead. 
   
   Also, it's not a good practice to create a PR from `master` branch, it's much better to use a dedicated feature branch for that. Though, I think we can stand with this for now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pareshsarafmdb commented on a change in pull request #14927: [BEAM-12400] MongoDBIO support for update within documents

Posted by GitBox <gi...@apache.org>.
pareshsarafmdb commented on a change in pull request #14927:
URL: https://github.com/apache/beam/pull/14927#discussion_r665446598



##########
File path: sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
##########
@@ -910,15 +973,24 @@ public void startBundle() {
       public void processElement(ProcessContext ctx) {
         // Need to copy the document because mongoCollection.insertMany() will mutate it
         // before inserting (will assign an id).
+
         batch.add(new Document(ctx.element()));
         if (batch.size() >= spec.batchSize()) {
-          flush();
+          if (spec.isUpdate()) {

Review comment:
       @aromanenko-dev @pabloem gentle reminder to review




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on a change in pull request #14927: [BEAM-12400] MongoDBIO support for update within documents

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #14927:
URL: https://github.com/apache/beam/pull/14927#discussion_r661802399



##########
File path: sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
##########
@@ -106,6 +109,28 @@
  *     .withNumSplits(30))
  *
  * }</pre>
+ *
+ * *
+ *
+ * <p>To configure a MongoDB sink and update, you must specify a connection {@code URI}, a {@code
+ * Database} * name, a {@code Collection} name. It matches the key with _id in target collection.
+ * For instance: * *
+ *
+ * <pre>{@code
+ * * pipeline
+ * *   .apply(...)
+ * *   .apply(MongoDbIO.write()
+ * *     .withUri("mongodb://localhost:27017")
+ * *     .withDatabase("my-database")
+ * *     .withCollection("my-collection")
+ * *     .withIsUpdate(true)
+ * *     .withUpdateKey("key-to-match")
+ * *     .withUpdateField("field-to-update")
+ * *     .withUpdateOperator("$set")
+ * *     .withNumSplits(30))

Review comment:
       just for my information - is this something that you'll want to take on?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on a change in pull request #14927: [BEAM-12400] MongoDBIO support for update within documents

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #14927:
URL: https://github.com/apache/beam/pull/14927#discussion_r653794490



##########
File path: sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
##########
@@ -106,6 +109,28 @@
  *     .withNumSplits(30))
  *
  * }</pre>
+ *
+ * *
+ *
+ * <p>To configure a MongoDB sink and update, you must specify a connection {@code URI}, a {@code
+ * Database} * name, a {@code Collection} name. It matches the key with _id in target collection.
+ * For instance: * *
+ *
+ * <pre>{@code
+ * * pipeline
+ * *   .apply(...)
+ * *   .apply(MongoDbIO.write()
+ * *     .withUri("mongodb://localhost:27017")
+ * *     .withDatabase("my-database")
+ * *     .withCollection("my-collection")
+ * *     .withIsUpdate(true)
+ * *     .withUpdateKey("key-to-match")
+ * *     .withUpdateField("field-to-update")
+ * *     .withUpdateOperator("$set")
+ * *     .withNumSplits(30))

Review comment:
       my guess is that maybe we have an document like this in the database:
   
   ```
   {
    "name": "ironman",
    "status": "active",
    "age": 55,
    "location": "malibu",
    "lastUpdated": "(SOME TIMESTAMP)"
   }
   ```
   
   And perhaps the PCollection that comes as input contains the following element:
   
   ```
   {
    "name": "ironman",
    "status": "inactive",
    "age": 56,
    "location": "malibu"
   }
   ```
   
   In this case, we could do:
   ```
         .withUpdateKey("name")
         .withUpdateFields(
                    UpdateField.of("$set", "status"),
                    UpdateField.of("$currentDate", "lastUpdated"),
                    UpdateField.of("$set", "age")))
   ```
   
   Does that make sense? Another thing I wonder about is - what if we just want to update all the fields without having to list them all one by one on the `updateFields` attribute of the transform.
   
   thoughts?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on a change in pull request #14927: [BEAM-12400] MongoDBIO support for update within documents

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #14927:
URL: https://github.com/apache/beam/pull/14927#discussion_r655668926



##########
File path: sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
##########
@@ -106,6 +109,28 @@
  *     .withNumSplits(30))
  *
  * }</pre>
+ *
+ * *
+ *
+ * <p>To configure a MongoDB sink and update, you must specify a connection {@code URI}, a {@code
+ * Database} * name, a {@code Collection} name. It matches the key with _id in target collection.
+ * For instance: * *
+ *
+ * <pre>{@code
+ * * pipeline
+ * *   .apply(...)
+ * *   .apply(MongoDbIO.write()
+ * *     .withUri("mongodb://localhost:27017")
+ * *     .withDatabase("my-database")
+ * *     .withCollection("my-collection")
+ * *     .withIsUpdate(true)
+ * *     .withUpdateKey("key-to-match")
+ * *     .withUpdateField("field-to-update")
+ * *     .withUpdateOperator("$set")
+ * *     .withNumSplits(30))

Review comment:
       This makes sense to me! : )
   
   I suppose when we update the full document, then the field names must be the same?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aromanenko-dev commented on pull request #14927: [BEAM-12400] MongoDBIO support for update within documents

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on pull request #14927:
URL: https://github.com/apache/beam/pull/14927#issuecomment-896126296


   @pareshsarafmdb Could you rebase against `master`, run `./gradlew :sdks:java:io:mongodb:compileJava` locally and push it if it's ok?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aromanenko-dev commented on pull request #14927: [BEAM-12400] MongoDBIO support for update within documents

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on pull request #14927:
URL: https://github.com/apache/beam/pull/14927#issuecomment-895838433


   @pareshsarafmdb No worries, it's just a command to run `Java PreCommit` job. Actually, it fails (you can see the results by clicking on "Details" against `Java ("Run Java PreCommit")` check): 
   
   ```
   20:47:11 > Task :sdks:java:io:mongodb:compileJava
   20:47:11 /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_Phrase/src/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/UpdateField.java:43: error: [argument.type.incompatible] incompatible argument for parameter sourceField of UpdateField.
   20:47:11     return new UpdateField(updateOperator, null, destField);
   20:47:11                                            ^
   20:47:11   found   : null (NullType)
   20:47:11   required: @Initialized @NonNull String
   ```
   
   I'd recommend to run this command `./gradlew :sdks:java:io:mongodb:check` locally before pushing your changes to make sure that everything is fine. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pareshsarafmdb commented on pull request #14927: [BEAM-12400] MongoDBIO support for update within documents

Posted by GitBox <gi...@apache.org>.
pareshsarafmdb commented on pull request #14927:
URL: https://github.com/apache/beam/pull/14927#issuecomment-909888653


   > > Just added minor stylistic comments.
   > > I still wonder: What happens when users want to update the whole old record into the whole new record?
   > > Finally, I have one more question about a later improvement to consider for this IO - is it worth thinking later about passing a lambda to choose the destination collection?
   > 
   > @pabloem If I understand what if user wants to update with entire source document into a destination document - In that case you have a overloaded method which just takes update operator and dest field. This does full document update. Did I make sense?
   
   Yes thats right,


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pareshsarafmdb commented on pull request #14927: [BEAM-12400] MongoDBIO support for update within documents

Posted by GitBox <gi...@apache.org>.
pareshsarafmdb commented on pull request #14927:
URL: https://github.com/apache/beam/pull/14927#issuecomment-895903270


   > @pareshsarafmdb No worries, it's just a command to run `Java PreCommit` job. Actually, it fails (you can see the results by clicking on "Details" against `Java ("Run Java PreCommit")` check):
   > 
   > ```
   > 20:47:11 > Task :sdks:java:io:mongodb:compileJava
   > 20:47:11 /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_Phrase/src/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/UpdateField.java:43: error: [argument.type.incompatible] incompatible argument for parameter sourceField of UpdateField.
   > 20:47:11     return new UpdateField(updateOperator, null, destField);
   > 20:47:11                                            ^
   > 20:47:11   found   : null (NullType)
   > 20:47:11   required: @Initialized @NonNull String
   > ```
   > 
   > I'd recommend to run this command `./gradlew :sdks:java:io:mongodb:check` locally before pushing your changes to make sure that everything is fine.
   
   I have done it and verified it again. No issues with the build in local. 
   BUILD SUCCESSFUL in 3m 4s
   58 actionable tasks: 58 executed


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pareshsarafmdb commented on a change in pull request #14927: [BEAM-12400] MongoDBIO support for update within documents

Posted by GitBox <gi...@apache.org>.
pareshsarafmdb commented on a change in pull request #14927:
URL: https://github.com/apache/beam/pull/14927#discussion_r689484235



##########
File path: sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/UpdateField.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.mongodb;
+
+import java.io.Serializable;
+
+public class UpdateField implements Serializable {
+
+  private String updateOperator;
+
+  private String sourceField;
+
+  private String destField;
+
+  public UpdateField(String updateOperator, String sourceField, String destField) {
+    this.updateOperator = updateOperator;
+    this.sourceField = sourceField;
+    this.destField = destField;
+  }
+
+  /** for updating field by field. */
+  public static UpdateField of(String updateOperator, String sourceField, String destField) {
+    return new UpdateField(updateOperator, sourceField, destField);
+  }
+
+  /** for updating with entire input document. */
+  public static UpdateField of(String updateOperator, String destField) {
+    return new UpdateField(updateOperator, null, destField);

Review comment:
       @pabloem Have changed it to auto value class which has two methods - fieldUpdate and fullUpdate. Example below:
   MongoDbIO.write()
   .withUri("mongodb://localhost:" + port)
   .withDatabase(DATABASE)
   .withCollection(collectionName)
   .withUpdateConfiguration(
   UpdateConfiguration.create()
   .withUpdateKey("id")
   .withUpdateFields(
   UpdateField.create().fieldUpdate("$set", "scientist", "scientist"),
   UpdateField.create().fieldUpdate("$set", "country", "country"))));




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #14927: [BEAM-12400] MongoDBIO support for update within documents

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #14927:
URL: https://github.com/apache/beam/pull/14927#issuecomment-896229645


   ... also sorry about the delay! : )


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pareshsarafmdb commented on a change in pull request #14927: [BEAM-12400] MongoDBIO support for update within documents

Posted by GitBox <gi...@apache.org>.
pareshsarafmdb commented on a change in pull request #14927:
URL: https://github.com/apache/beam/pull/14927#discussion_r699877276



##########
File path: sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/UpdateField.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.mongodb;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+@Experimental(Kind.SOURCE_SINK)
+@AutoValue
+public abstract class UpdateField implements Serializable {
+
+  abstract @Nullable String updateOperator();
+
+  abstract @Nullable String sourceField();
+
+  abstract @Nullable String destField();
+
+  private static Builder builder() {
+    return new AutoValue_UpdateField.Builder().setSourceField(null);
+  }
+
+  abstract UpdateField.Builder toBuilder();
+
+  public static UpdateField create() {
+    return builder().build();
+  }

Review comment:
       Have done the change. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #14927: [BEAM-12400] MongoDBIO support for update within documents

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #14927:
URL: https://github.com/apache/beam/pull/14927#issuecomment-908600217


   test issues are unrelated (at least Java Examples failures). This is on me to review. Sorry about the delay again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pareshsarafmdb commented on a change in pull request #14927: [BEAM-12400] MongoDBIO support for update within documents

Posted by GitBox <gi...@apache.org>.
pareshsarafmdb commented on a change in pull request #14927:
URL: https://github.com/apache/beam/pull/14927#discussion_r699877102



##########
File path: sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
##########
@@ -106,6 +112,27 @@
  *     .withNumSplits(30))
  *
  * }</pre>
+ *
+ * *
+ *
+ * <p>To configure a MongoDB sink and update, you must specify a connection {@code URI}, a {@code
+ * Database} * name, a {@code Collection} name. It matches the key with _id in target collection.
+ * For instance: * *
+ *
+ * <pre>{@code
+ * * pipeline
+ * *   .apply(...)
+ * *   .apply(MongoDbIO.write()
+ * *     .withUri("mongodb://localhost:27017")
+ * *     .withDatabase("my-database")
+ * *     .withCollection("my-collection")
+ * *     .withUpdateConfiguration(UpdateConfiguration.create().withUpdateKey("key1")
+ * *     .withUpdateFields(UpdateField.of("$set", "source-field1", "dest-field1"),
+ * *                       UpdateField.of("$set","source-field2", "dest-field2"),
+ * *                       //pushes entire input doc to the dest field
+ * *                        UpdateField.of("$push", "dest-field3") )));

Review comment:
       I have done it. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aromanenko-dev commented on a change in pull request #14927: [BEAM-12400] MongoDBIO support for update within documents

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on a change in pull request #14927:
URL: https://github.com/apache/beam/pull/14927#discussion_r668703561



##########
File path: sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
##########
@@ -910,15 +973,24 @@ public void startBundle() {
       public void processElement(ProcessContext ctx) {
         // Need to copy the document because mongoCollection.insertMany() will mutate it
         // before inserting (will assign an id).
+
         batch.add(new Document(ctx.element()));
         if (batch.size() >= spec.batchSize()) {
-          flush();
+          if (spec.isUpdate()) {

Review comment:
       @pareshsarafmdb Thanks, let me take a look one more time. 
   @pabloem Do you have any comments on this?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aromanenko-dev commented on a change in pull request #14927: [BEAM-12400] MongoDBIO support for update within documents

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on a change in pull request #14927:
URL: https://github.com/apache/beam/pull/14927#discussion_r668703561



##########
File path: sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
##########
@@ -910,15 +973,24 @@ public void startBundle() {
       public void processElement(ProcessContext ctx) {
         // Need to copy the document because mongoCollection.insertMany() will mutate it
         // before inserting (will assign an id).
+
         batch.add(new Document(ctx.element()));
         if (batch.size() >= spec.batchSize()) {
-          flush();
+          if (spec.isUpdate()) {

Review comment:
       @pareshsarafmdb Thanks, let me take a look. 
   @pabloem Do you have any comments on this?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pareshsarafmdb commented on a change in pull request #14927: [BEAM-12400] MongoDBIO support for update within documents

Posted by GitBox <gi...@apache.org>.
pareshsarafmdb commented on a change in pull request #14927:
URL: https://github.com/apache/beam/pull/14927#discussion_r699846274



##########
File path: sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/UpdateField.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.mongodb;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+@Experimental(Kind.SOURCE_SINK)
+@AutoValue
+public abstract class UpdateField implements Serializable {
+
+  abstract @Nullable String updateOperator();
+
+  abstract @Nullable String sourceField();
+
+  abstract @Nullable String destField();
+
+  private static Builder builder() {
+    return new AutoValue_UpdateField.Builder().setSourceField(null);
+  }
+
+  abstract UpdateField.Builder toBuilder();
+
+  public static UpdateField create() {
+    return builder().build();
+  }
+
+  @AutoValue.Builder
+  abstract static class Builder {
+    abstract UpdateField.Builder setUpdateOperator(@Nullable String updateOperator);
+
+    abstract UpdateField.Builder setSourceField(@Nullable String sourceField);
+
+    abstract UpdateField.Builder setDestField(@Nullable String destField);
+
+    abstract UpdateField build();
+  }
+
+  /** Sets the limit of documents to find. */
+  public UpdateField fullUpdate(String updateOperator, String destField) {
+    return toBuilder().setUpdateOperator(updateOperator).setDestField(destField).build();
+  }

Review comment:
       Yes thats right. Thats intended. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem merged pull request #14927: [BEAM-12400] MongoDBIO support for update within documents

Posted by GitBox <gi...@apache.org>.
pabloem merged pull request #14927:
URL: https://github.com/apache/beam/pull/14927


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org