You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/12/13 04:13:31 UTC

[GitHub] [beam] thiagotnunes opened a new pull request #16203: [BEAM-12164] Add Spanner Change Stream Mappers

thiagotnunes opened a new pull request #16203:
URL: https://github.com/apache/beam/pull/16203


   The mapper classes convert from Cloud Spanner Structs to the change stream models used by the connector. There are two mappers implemented:
   
   1. For mapping to partition metadata models.
   2. For mapping to change stream records (one of heartbeat, data or child partitions).
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   `ValidatesRunner` compliance status (on master branch)
   --------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Lang</th>
         <th>ULR</th>
         <th>Dataflow</th>
         <th>Flink</th>
         <th>Samza</th>
         <th>Spark</th>
         <th>Twister2</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Go</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon">
           </a>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
       <tr>
         <td>Java</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_ULR/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_ULR/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming/lastCompletedBuild/badge/icon?subject=V1+Streaming">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon?subject=V1+Java+11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2_Streaming/lastCompletedBuild/badge/icon?subject=V2+Streaming">
           </a><br>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon?subject=Java+8">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon?subject=Java+11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon?subject=Portable+Streaming">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Samza/lastCompletedBuild/badge/icon?subject=Portable">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon?subject=Structured+Streaming">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon">
           </a>
         </td>
       </tr>
       <tr>
         <td>Python</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon?subject=ValCont">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
       <tr>
         <td>XLang</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   Examples testing status on various runners
   --------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Lang</th>
         <th>ULR</th>
         <th>Dataflow</th>
         <th>Flink</th>
         <th>Samza</th>
         <th>Spark</th>
         <th>Twister2</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Go</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>Java</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Java11_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Java11_Cron/lastCompletedBuild/badge/icon?subject=V1+Java11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_Examples_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_Examples_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
         </td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>Python</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>XLang</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   Post-Commit SDK/Transform Integration Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Go</th>
         <th>Java</th>
         <th>Python</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon?subject=3.6">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon?subject=3.7">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon?subject=3.8">
           </a>
         </td>
       </tr>
     </tbody>
   </table>
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>---</th>
         <th>Java</th>
         <th>Python</th>
         <th>Go</th>
         <th>Website</th>
         <th>Whitespace</th>
         <th>Typescript</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Non-portable</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon">
           </a><br>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon?subject=Tests">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon?subject=Lint">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon?subject=Docker">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon?subject=Docs">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
       </tr>
       <tr>
         <td>Portable</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_GoPortable_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_GoPortable_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] thiagotnunes commented on a change in pull request #16203: [BEAM-12164] Add Spanner Change Stream Mappers

Posted by GitBox <gi...@apache.org>.
thiagotnunes commented on a change in pull request #16203:
URL: https://github.com/apache/beam/pull/16203#discussion_r780039264



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java
##########
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper;
+
+import com.google.cloud.Timestamp;
+import com.google.cloud.spanner.Struct;
+import java.util.HashSet;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.ChangeStreamResultSetMetadata;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChangeStreamRecord;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChangeStreamRecordMetadata;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartition;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRecord;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ColumnType;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.HeartbeatRecord;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.InitialPartition;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.Mod;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ModType;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.TypeCode;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ValueCaptureType;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+
+/**
+ * This class is responsible for transforming a {@link Struct} to a {@link List} of {@link
+ * ChangeStreamRecord} models.
+ */
+public class ChangeStreamRecordMapper {
+
+  private static final String DATA_CHANGE_RECORD_COLUMN = "data_change_record";
+  private static final String HEARTBEAT_RECORD_COLUMN = "heartbeat_record";
+  private static final String CHILD_PARTITIONS_RECORD_COLUMN = "child_partitions_record";
+
+  private static final String COMMIT_TIMESTAMP_COLUMN = "commit_timestamp";
+  private static final String SERVER_TRANSACTION_ID_COLUMN = "server_transaction_id";
+  private static final String IS_LAST_RECORD_IN_TRANSACTION_IN_PARTITION_COLUMN =
+      "is_last_record_in_transaction_in_partition";
+  private static final String RECORD_SEQUENCE_COLUMN = "record_sequence";
+  private static final String TABLE_NAME_COLUMN = "table_name";
+  private static final String COLUMN_TYPES_COLUMN = "column_types";
+  private static final String MODS_COLUMN = "mods";
+  private static final String MOD_TYPE_COLUMN = "mod_type";
+  private static final String VALUE_CAPTURE_TYPE_COLUMN = "value_capture_type";
+  private static final String NUMBER_OF_RECORDS_IN_TRANSACTION_COLUMN =
+      "number_of_records_in_transaction";
+  private static final String NUMBER_OF_PARTITIONS_IN_TRANSACTION_COLUMN =
+      "number_of_partitions_in_transaction";
+  private static final String NAME_COLUMN = "name";
+  private static final String TYPE_COLUMN = "type";
+  private static final String IS_PRIMARY_KEY_COLUMN = "is_primary_key";
+  private static final String ORDINAL_POSITION_COLUMN = "ordinal_position";
+  private static final String KEYS_COLUMN = "keys";
+  private static final String OLD_VALUES_COLUMN = "old_values";
+  private static final String NEW_VALUES_COLUMN = "new_values";
+
+  private static final String TIMESTAMP_COLUMN = "timestamp";
+
+  private static final String START_TIMESTAMP_COLUMN = "start_timestamp";
+  private static final String CHILD_PARTITIONS_COLUMN = "child_partitions";
+  private static final String PARENT_PARTITION_TOKENS_COLUMN = "parent_partition_tokens";
+  private static final String TOKEN_COLUMN = "token";
+
+  ChangeStreamRecordMapper() {}
+
+  /**
+   * Transforms a {@link Struct} representing a change stream result into a {@link List} of {@link
+   * ChangeStreamRecord} model. The type of the change stream record will be identified and one of
+   * the following subclasses can be returned within the resulting {@link List}:
+   *
+   * <ul>
+   *   <li>{@link DataChangeRecord}
+   *   <li>{@link HeartbeatRecord}
+   *   <li>{@link ChildPartitionsRecord}
+   * </ul>
+   *
+   * Additionally to the {@link Struct} received, the originating partition of the records (given by
+   * the {@link PartitionMetadata} parameter) and the stream metadata (given by the {@link
+   * ChangeStreamResultSetMetadata}) are used to populate the {@link ChangeStreamRecordMetadata} for
+   * each record mapped.
+   *
+   * <p>The {@link Struct} is expected to have the following fields:
+   *
+   * <ul>
+   *   <li>{@link ChangeStreamRecordMapper#DATA_CHANGE_RECORD_COLUMN}: non-nullable {@link Struct}
+   *       list of data change records.
+   *       <ul>
+   *         <li>{@link ChangeStreamRecordMapper#COMMIT_TIMESTAMP_COLUMN}: non-nullable {@link
+   *             Timestamp} representing the timestamp at which the modifications within the record
+   *             were committed in Cloud Spanner.
+   *         <li>{@link ChangeStreamRecordMapper#SERVER_TRANSACTION_ID_COLUMN}: non-nullable {@link
+   *             String} representing the unique transaction id in which the modifications for this
+   *             record occurred.
+   *         <li>{@link ChangeStreamRecordMapper#IS_LAST_RECORD_IN_TRANSACTION_IN_PARTITION_COLUMN}:
+   *             non-nullable {@link Boolean} indicating whether this record is the last emitted for
+   *             the transaction.
+   *         <li>{@link ChangeStreamRecordMapper#RECORD_SEQUENCE_COLUMN}: non-nullable {@link
+   *             String} representing the order in which this record appears within the context of a
+   *             partition, commit timestamp and transaction.
+   *         <li>{@link ChangeStreamRecordMapper#TABLE_NAME_COLUMN}: non-nullable {@link String}
+   *             representing the name of the table in which the modifications for this record
+   *             occurred.
+   *         <li>{@link ChangeStreamRecordMapper#COLUMN_TYPES_COLUMN}: non-nullable {@link List} of
+   *             {@link Struct}s representing the type of the primary keys and modified columns
+   *             within this record.
+   *             <ul>
+   *               <li>{@link ChangeStreamRecordMapper#NAME_COLUMN}: non-nullable {@link String}
+   *                   representing the name of a column.
+   *               <li>{@link ChangeStreamRecordMapper#TYPE_COLUMN}: non-nullable {@link String}
+   *                   representing the type of a column.
+   *               <li>{@link ChangeStreamRecordMapper#IS_PRIMARY_KEY_COLUMN}: non-nullable {@link
+   *                   Boolean} indicating if the column is part of the primary key.
+   *               <li>{@link ChangeStreamRecordMapper#ORDINAL_POSITION_COLUMN}: non-nullable {@link
+   *                   Long} representing the position of the column in the table it is defined.
+   *             </ul>
+   *         <li>{@link ChangeStreamRecordMapper#MODS_COLUMN}: non-nullable {@link List} of {@link
+   *             Struct}s representing the data modifications within this record.
+   *             <ul>
+   *               <li>{@link ChangeStreamRecordMapper#KEYS_COLUMN}: non-nullable {@link String}
+   *                   json object, where keys are the primary key column names, and the values are
+   *                   their corresponding values.
+   *               <li>{@link ChangeStreamRecordMapper#OLD_VALUES_COLUMN}: nullable {@link String}
+   *                   json object displaying the old state of the columns modified, where keys are
+   *                   the column names, and the values are their corresponding values.
+   *               <li>{@link ChangeStreamRecordMapper#NEW_VALUES_COLUMN}: nullable {@link String}
+   *                   json object displaying the new state of the columns modified, where keys are
+   *                   the column names, and the values are their corresponding values.
+   *             </ul>
+   *         <li>{@link ChangeStreamRecordMapper#MOD_TYPE_COLUMN}: non-nullable {@link String}
+   *             representing the type of operation that caused the modifications (see also {@link
+   *             ModType}.
+   *         <li>{@link ChangeStreamRecordMapper#VALUE_CAPTURE_TYPE_COLUMN}: non-nullable {@link
+   *             String} representing the capture type of the change stream that generated this
+   *             record (see also {@link ValueCaptureType}).
+   *         <li>{@link ChangeStreamRecordMapper#NUMBER_OF_RECORDS_IN_TRANSACTION_COLUMN}:
+   *             non-nullable {@link Long} representing the total number of data change records for
+   *             the transaction in which this record occurred.
+   *         <li>{@link ChangeStreamRecordMapper#NUMBER_OF_PARTITIONS_IN_TRANSACTION_COLUMN}:
+   *             non-nullable {@link Long} representing the total number of partitions for the
+   *             transaction in which this record occurred.
+   *       </ul>
+   *   <li>{@link ChangeStreamRecordMapper#HEARTBEAT_RECORD_COLUMN}: non-nullable {@link Struct}
+   *       list of hearbeat records.
+   *       <ul>
+   *         <li>{@link ChangeStreamRecordMapper#TIMESTAMP_COLUMN}: non-nullable {@link Timestamp}
+   *             representing the timestamp for which the change stream query has returned all
+   *             changes (see more in {@link HeartbeatRecord#getTimestamp()}.
+   *       </ul>
+   *   <li>{@link ChangeStreamRecordMapper#CHILD_PARTITIONS_RECORD_COLUMN}: non-nullable {@link
+   *       Struct} list of child partitions records.
+   *       <ul>
+   *         <li>{@link ChangeStreamRecordMapper#START_TIMESTAMP_COLUMN}: non-nullable {@link
+   *             Timestamp} representing the timestamp at which the new partition started being
+   *             valid in Cloud Spanner.
+   *         <li>{@link ChangeStreamRecordMapper#RECORD_SEQUENCE_COLUMN}: non-nullable {@link
+   *             String} representing the order in which this record appears within the context of a
+   *             partition and commit timestamp.
+   *         <li>{@link ChangeStreamRecordMapper#CHILD_PARTITIONS_COLUMN}: non-nullable {@link List}
+   *             of {@link Struct} representing the new child partitions.
+   *             <ul>
+   *               <li>{@link ChangeStreamRecordMapper#TOKEN_COLUMN}: non-nullable {@link String}
+   *                   representing the unique identifier of the new child partition.
+   *               <li>{@link ChangeStreamRecordMapper#PARENT_PARTITION_TOKENS_COLUMN}: non-nullable
+   *                   {@link List} of {@link String} representing the unique identifier(s) of
+   *                   parent partition(s) where this child partition originated from.
+   *             </ul>
+   *       </ul>
+   * </ul>
+   *
+   * @param partition the partition metadata from which the row was generated
+   * @param row the struct row, representing a single change stream result (it may contain multiple
+   *     change stream records within)
+   * @param resultSetMetadata the metadata generated when reading the change stream row
+   * @return a {@link List} of {@link ChangeStreamRecord} subclasses
+   */
+  public List<ChangeStreamRecord> toChangeStreamRecords(
+      PartitionMetadata partition, Struct row, ChangeStreamResultSetMetadata resultSetMetadata) {
+    return row.getStructList(0).stream()
+        .flatMap(struct -> toChangeStreamRecord(partition, struct, resultSetMetadata))
+        .collect(Collectors.toList());
+  }
+
+  private Stream<ChangeStreamRecord> toChangeStreamRecord(
+      PartitionMetadata partition, Struct row, ChangeStreamResultSetMetadata resultSetMetadata) {
+
+    final Stream<DataChangeRecord> dataChangeRecords =
+        row.getStructList(DATA_CHANGE_RECORD_COLUMN).stream()
+            .filter(this::isNonNullDataChangeRecord)
+            .map(struct -> toDataChangeRecord(partition, struct, resultSetMetadata));
+
+    final Stream<HeartbeatRecord> heartbeatRecords =
+        row.getStructList(HEARTBEAT_RECORD_COLUMN).stream()
+            .filter(this::isNonNullHeartbeatRecord)
+            .map(struct -> toHeartbeatRecord(partition, struct, resultSetMetadata));
+
+    final Stream<ChildPartitionsRecord> childPartitionsRecords =
+        row.getStructList(CHILD_PARTITIONS_RECORD_COLUMN).stream()
+            .filter(this::isNonNullChildPartitionsRecord)
+            .map(struct -> toChildPartitionsRecord(partition, struct, resultSetMetadata));
+
+    return Stream.concat(
+        Stream.concat(dataChangeRecords, heartbeatRecords), childPartitionsRecords);

Review comment:
       That is a good point, I think this might affect the watermark. I will change this.
   
   Thanks for pointing this out.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] thiagotnunes commented on a change in pull request #16203: [BEAM-12164] Add Spanner Change Stream Mappers

Posted by GitBox <gi...@apache.org>.
thiagotnunes commented on a change in pull request #16203:
URL: https://github.com/apache/beam/pull/16203#discussion_r770225805



##########
File path: sdks/java/io/google-cloud-platform/build.gradle
##########
@@ -131,6 +131,7 @@ dependencies {
   compile library.java.arrow_memory_core
   compile library.java.arrow_vector
 
+  compile "io.opencensus:opencensus-api:0.28.0"

Review comment:
       This version introduces the log4shell vulnerability. It should be updated to the latest




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #16203: [BEAM-12164] Add Spanner Change Stream Mappers

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #16203:
URL: https://github.com/apache/beam/pull/16203#issuecomment-1007152147


   @thiagotnunes please take a look at the comments : )


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] thiagotnunes commented on a change in pull request #16203: [BEAM-12164] Add Spanner Change Stream Mappers

Posted by GitBox <gi...@apache.org>.
thiagotnunes commented on a change in pull request #16203:
URL: https://github.com/apache/beam/pull/16203#discussion_r780042786



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java
##########
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper;
+
+import com.google.cloud.Timestamp;
+import com.google.cloud.spanner.Struct;
+import java.util.HashSet;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.ChangeStreamResultSetMetadata;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChangeStreamRecord;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChangeStreamRecordMetadata;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartition;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRecord;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ColumnType;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.HeartbeatRecord;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.InitialPartition;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.Mod;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ModType;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.TypeCode;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ValueCaptureType;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+
+/**
+ * This class is responsible for transforming a {@link Struct} to a {@link List} of {@link
+ * ChangeStreamRecord} models.
+ */
+public class ChangeStreamRecordMapper {
+
+  private static final String DATA_CHANGE_RECORD_COLUMN = "data_change_record";
+  private static final String HEARTBEAT_RECORD_COLUMN = "heartbeat_record";
+  private static final String CHILD_PARTITIONS_RECORD_COLUMN = "child_partitions_record";
+
+  private static final String COMMIT_TIMESTAMP_COLUMN = "commit_timestamp";
+  private static final String SERVER_TRANSACTION_ID_COLUMN = "server_transaction_id";
+  private static final String IS_LAST_RECORD_IN_TRANSACTION_IN_PARTITION_COLUMN =
+      "is_last_record_in_transaction_in_partition";
+  private static final String RECORD_SEQUENCE_COLUMN = "record_sequence";
+  private static final String TABLE_NAME_COLUMN = "table_name";
+  private static final String COLUMN_TYPES_COLUMN = "column_types";
+  private static final String MODS_COLUMN = "mods";
+  private static final String MOD_TYPE_COLUMN = "mod_type";
+  private static final String VALUE_CAPTURE_TYPE_COLUMN = "value_capture_type";
+  private static final String NUMBER_OF_RECORDS_IN_TRANSACTION_COLUMN =
+      "number_of_records_in_transaction";
+  private static final String NUMBER_OF_PARTITIONS_IN_TRANSACTION_COLUMN =
+      "number_of_partitions_in_transaction";
+  private static final String NAME_COLUMN = "name";
+  private static final String TYPE_COLUMN = "type";
+  private static final String IS_PRIMARY_KEY_COLUMN = "is_primary_key";
+  private static final String ORDINAL_POSITION_COLUMN = "ordinal_position";
+  private static final String KEYS_COLUMN = "keys";
+  private static final String OLD_VALUES_COLUMN = "old_values";
+  private static final String NEW_VALUES_COLUMN = "new_values";
+
+  private static final String TIMESTAMP_COLUMN = "timestamp";
+
+  private static final String START_TIMESTAMP_COLUMN = "start_timestamp";
+  private static final String CHILD_PARTITIONS_COLUMN = "child_partitions";
+  private static final String PARENT_PARTITION_TOKENS_COLUMN = "parent_partition_tokens";
+  private static final String TOKEN_COLUMN = "token";
+
+  ChangeStreamRecordMapper() {}
+
+  /**
+   * Transforms a {@link Struct} representing a change stream result into a {@link List} of {@link
+   * ChangeStreamRecord} model. The type of the change stream record will be identified and one of
+   * the following subclasses can be returned within the resulting {@link List}:
+   *
+   * <ul>
+   *   <li>{@link DataChangeRecord}
+   *   <li>{@link HeartbeatRecord}
+   *   <li>{@link ChildPartitionsRecord}
+   * </ul>
+   *
+   * Additionally to the {@link Struct} received, the originating partition of the records (given by
+   * the {@link PartitionMetadata} parameter) and the stream metadata (given by the {@link
+   * ChangeStreamResultSetMetadata}) are used to populate the {@link ChangeStreamRecordMetadata} for
+   * each record mapped.
+   *
+   * <p>The {@link Struct} is expected to have the following fields:
+   *
+   * <ul>
+   *   <li>{@link ChangeStreamRecordMapper#DATA_CHANGE_RECORD_COLUMN}: non-nullable {@link Struct}
+   *       list of data change records.
+   *       <ul>
+   *         <li>{@link ChangeStreamRecordMapper#COMMIT_TIMESTAMP_COLUMN}: non-nullable {@link
+   *             Timestamp} representing the timestamp at which the modifications within the record
+   *             were committed in Cloud Spanner.
+   *         <li>{@link ChangeStreamRecordMapper#SERVER_TRANSACTION_ID_COLUMN}: non-nullable {@link
+   *             String} representing the unique transaction id in which the modifications for this
+   *             record occurred.
+   *         <li>{@link ChangeStreamRecordMapper#IS_LAST_RECORD_IN_TRANSACTION_IN_PARTITION_COLUMN}:
+   *             non-nullable {@link Boolean} indicating whether this record is the last emitted for
+   *             the transaction.
+   *         <li>{@link ChangeStreamRecordMapper#RECORD_SEQUENCE_COLUMN}: non-nullable {@link
+   *             String} representing the order in which this record appears within the context of a
+   *             partition, commit timestamp and transaction.
+   *         <li>{@link ChangeStreamRecordMapper#TABLE_NAME_COLUMN}: non-nullable {@link String}
+   *             representing the name of the table in which the modifications for this record
+   *             occurred.
+   *         <li>{@link ChangeStreamRecordMapper#COLUMN_TYPES_COLUMN}: non-nullable {@link List} of
+   *             {@link Struct}s representing the type of the primary keys and modified columns
+   *             within this record.
+   *             <ul>
+   *               <li>{@link ChangeStreamRecordMapper#NAME_COLUMN}: non-nullable {@link String}
+   *                   representing the name of a column.
+   *               <li>{@link ChangeStreamRecordMapper#TYPE_COLUMN}: non-nullable {@link String}
+   *                   representing the type of a column.
+   *               <li>{@link ChangeStreamRecordMapper#IS_PRIMARY_KEY_COLUMN}: non-nullable {@link
+   *                   Boolean} indicating if the column is part of the primary key.
+   *               <li>{@link ChangeStreamRecordMapper#ORDINAL_POSITION_COLUMN}: non-nullable {@link
+   *                   Long} representing the position of the column in the table it is defined.
+   *             </ul>
+   *         <li>{@link ChangeStreamRecordMapper#MODS_COLUMN}: non-nullable {@link List} of {@link
+   *             Struct}s representing the data modifications within this record.
+   *             <ul>
+   *               <li>{@link ChangeStreamRecordMapper#KEYS_COLUMN}: non-nullable {@link String}
+   *                   json object, where keys are the primary key column names, and the values are
+   *                   their corresponding values.
+   *               <li>{@link ChangeStreamRecordMapper#OLD_VALUES_COLUMN}: nullable {@link String}
+   *                   json object displaying the old state of the columns modified, where keys are
+   *                   the column names, and the values are their corresponding values.
+   *               <li>{@link ChangeStreamRecordMapper#NEW_VALUES_COLUMN}: nullable {@link String}
+   *                   json object displaying the new state of the columns modified, where keys are
+   *                   the column names, and the values are their corresponding values.
+   *             </ul>
+   *         <li>{@link ChangeStreamRecordMapper#MOD_TYPE_COLUMN}: non-nullable {@link String}
+   *             representing the type of operation that caused the modifications (see also {@link
+   *             ModType}.
+   *         <li>{@link ChangeStreamRecordMapper#VALUE_CAPTURE_TYPE_COLUMN}: non-nullable {@link
+   *             String} representing the capture type of the change stream that generated this
+   *             record (see also {@link ValueCaptureType}).
+   *         <li>{@link ChangeStreamRecordMapper#NUMBER_OF_RECORDS_IN_TRANSACTION_COLUMN}:
+   *             non-nullable {@link Long} representing the total number of data change records for
+   *             the transaction in which this record occurred.
+   *         <li>{@link ChangeStreamRecordMapper#NUMBER_OF_PARTITIONS_IN_TRANSACTION_COLUMN}:
+   *             non-nullable {@link Long} representing the total number of partitions for the
+   *             transaction in which this record occurred.
+   *       </ul>
+   *   <li>{@link ChangeStreamRecordMapper#HEARTBEAT_RECORD_COLUMN}: non-nullable {@link Struct}
+   *       list of hearbeat records.
+   *       <ul>
+   *         <li>{@link ChangeStreamRecordMapper#TIMESTAMP_COLUMN}: non-nullable {@link Timestamp}
+   *             representing the timestamp for which the change stream query has returned all
+   *             changes (see more in {@link HeartbeatRecord#getTimestamp()}.
+   *       </ul>
+   *   <li>{@link ChangeStreamRecordMapper#CHILD_PARTITIONS_RECORD_COLUMN}: non-nullable {@link
+   *       Struct} list of child partitions records.
+   *       <ul>
+   *         <li>{@link ChangeStreamRecordMapper#START_TIMESTAMP_COLUMN}: non-nullable {@link
+   *             Timestamp} representing the timestamp at which the new partition started being
+   *             valid in Cloud Spanner.
+   *         <li>{@link ChangeStreamRecordMapper#RECORD_SEQUENCE_COLUMN}: non-nullable {@link
+   *             String} representing the order in which this record appears within the context of a
+   *             partition and commit timestamp.
+   *         <li>{@link ChangeStreamRecordMapper#CHILD_PARTITIONS_COLUMN}: non-nullable {@link List}
+   *             of {@link Struct} representing the new child partitions.
+   *             <ul>
+   *               <li>{@link ChangeStreamRecordMapper#TOKEN_COLUMN}: non-nullable {@link String}
+   *                   representing the unique identifier of the new child partition.
+   *               <li>{@link ChangeStreamRecordMapper#PARENT_PARTITION_TOKENS_COLUMN}: non-nullable
+   *                   {@link List} of {@link String} representing the unique identifier(s) of
+   *                   parent partition(s) where this child partition originated from.
+   *             </ul>
+   *       </ul>
+   * </ul>
+   *
+   * @param partition the partition metadata from which the row was generated
+   * @param row the struct row, representing a single change stream result (it may contain multiple
+   *     change stream records within)
+   * @param resultSetMetadata the metadata generated when reading the change stream row
+   * @return a {@link List} of {@link ChangeStreamRecord} subclasses
+   */
+  public List<ChangeStreamRecord> toChangeStreamRecords(
+      PartitionMetadata partition, Struct row, ChangeStreamResultSetMetadata resultSetMetadata) {
+    return row.getStructList(0).stream()
+        .flatMap(struct -> toChangeStreamRecord(partition, struct, resultSetMetadata))
+        .collect(Collectors.toList());
+  }
+
+  private Stream<ChangeStreamRecord> toChangeStreamRecord(
+      PartitionMetadata partition, Struct row, ChangeStreamResultSetMetadata resultSetMetadata) {
+
+    final Stream<DataChangeRecord> dataChangeRecords =
+        row.getStructList(DATA_CHANGE_RECORD_COLUMN).stream()
+            .filter(this::isNonNullDataChangeRecord)
+            .map(struct -> toDataChangeRecord(partition, struct, resultSetMetadata));
+
+    final Stream<HeartbeatRecord> heartbeatRecords =
+        row.getStructList(HEARTBEAT_RECORD_COLUMN).stream()
+            .filter(this::isNonNullHeartbeatRecord)
+            .map(struct -> toHeartbeatRecord(partition, struct, resultSetMetadata));
+
+    final Stream<ChildPartitionsRecord> childPartitionsRecords =
+        row.getStructList(CHILD_PARTITIONS_RECORD_COLUMN).stream()
+            .filter(this::isNonNullChildPartitionsRecord)
+            .map(struct -> toChildPartitionsRecord(partition, struct, resultSetMetadata));
+
+    return Stream.concat(
+        Stream.concat(dataChangeRecords, heartbeatRecords), childPartitionsRecords);

Review comment:
       Oh this is not a problem, because we can't have `ChildPartitionRecord`s and `HeartbeatRecord`s in the same response.
   
   For responses with `DataChangeRecord`s and `HeartbeatRecord`s, the timestamps of the latter must be greater than the former. It is the same case for the pair `DataChangeRecord`s and `ChildPartitionRecord`s.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] thiagotnunes commented on pull request #16203: [BEAM-12164] Add Spanner Change Stream Mappers

Posted by GitBox <gi...@apache.org>.
thiagotnunes commented on pull request #16203:
URL: https://github.com/apache/beam/pull/16203#issuecomment-993194920


   R: @pabloem


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] thiagotnunes commented on a change in pull request #16203: [BEAM-12164] Add Spanner Change Stream Mappers

Posted by GitBox <gi...@apache.org>.
thiagotnunes commented on a change in pull request #16203:
URL: https://github.com/apache/beam/pull/16203#discussion_r780038448



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/ChangeStreamMetrics.java
##########
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams;
+
+import java.io.Serializable;
+
+/** Class to aggregate metrics related functionality. */
+public class ChangeStreamMetrics implements Serializable {

Review comment:
       Yes, not used at the moment, but it will be soon




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] thiagotnunes commented on pull request #16203: [BEAM-12164] Add Spanner Change Stream Mappers

Posted by GitBox <gi...@apache.org>.
thiagotnunes commented on pull request #16203:
URL: https://github.com/apache/beam/pull/16203#issuecomment-999265257


   I think that the test failures are unrelated (the failure is on `debezium:test`). Any way we could re-run those @pabloem ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aaltay commented on pull request #16203: [BEAM-12164] Add Spanner Change Stream Mappers

Posted by GitBox <gi...@apache.org>.
aaltay commented on pull request #16203:
URL: https://github.com/apache/beam/pull/16203#issuecomment-1002712954


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] thiagotnunes commented on a change in pull request #16203: [BEAM-12164] Add Spanner Change Stream Mappers

Posted by GitBox <gi...@apache.org>.
thiagotnunes commented on a change in pull request #16203:
URL: https://github.com/apache/beam/pull/16203#discussion_r780038736



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java
##########
@@ -0,0 +1,470 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.dao;
+
+import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics.PARTITION_ID_ATTRIBUTE_LABEL;
+import static org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataAdminDao.COLUMN_CREATED_AT;
+import static org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataAdminDao.COLUMN_END_TIMESTAMP;
+import static org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataAdminDao.COLUMN_FINISHED_AT;
+import static org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataAdminDao.COLUMN_HEARTBEAT_MILLIS;
+import static org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataAdminDao.COLUMN_PARENT_TOKENS;
+import static org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataAdminDao.COLUMN_PARTITION_TOKEN;
+import static org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataAdminDao.COLUMN_RUNNING_AT;
+import static org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataAdminDao.COLUMN_SCHEDULED_AT;
+import static org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataAdminDao.COLUMN_START_TIMESTAMP;
+import static org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataAdminDao.COLUMN_STATE;
+import static org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataAdminDao.COLUMN_WATERMARK;
+
+import com.google.cloud.Timestamp;
+import com.google.cloud.spanner.DatabaseClient;
+import com.google.cloud.spanner.Mutation;
+import com.google.cloud.spanner.ResultSet;
+import com.google.cloud.spanner.Statement;
+import com.google.cloud.spanner.Struct;
+import com.google.cloud.spanner.TransactionContext;
+import com.google.cloud.spanner.TransactionRunner;
+import com.google.cloud.spanner.Value;
+import io.opencensus.common.Scope;
+import io.opencensus.trace.AttributeValue;
+import io.opencensus.trace.Tracer;
+import io.opencensus.trace.Tracing;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Function;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata.State;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+
+/** Data access object for the Connector metadata tables. */
+public class PartitionMetadataDao {
+  private static final Tracer TRACER = Tracing.getTracer();
+
+  private final String metadataTableName;
+  private final DatabaseClient databaseClient;
+
+  /**
+   * Constructs a partition metadata dao object given the generated name of the tables.
+   *
+   * @param metadataTableName the name of the partition metadata table
+   * @param databaseClient the {@link DatabaseClient} to perform queries
+   */
+  PartitionMetadataDao(String metadataTableName, DatabaseClient databaseClient) {
+    this.metadataTableName = metadataTableName;
+    this.databaseClient = databaseClient;
+  }
+
+  /**
+   * Fetches the partition metadata row data for the given partition token.
+   *
+   * @param partitionToken the partition unique identifier
+   * @return the partition metadata for the given token if it exists as a struct. Otherwise, it
+   *     returns null.
+   */
+  public @Nullable Struct getPartition(String partitionToken) {
+    try (Scope scope = TRACER.spanBuilder("getPartition").setRecordEvents(true).startScopedSpan()) {
+      TRACER
+          .getCurrentSpan()
+          .putAttribute(
+              PARTITION_ID_ATTRIBUTE_LABEL, AttributeValue.stringAttributeValue(partitionToken));
+      try (ResultSet resultSet =
+          databaseClient
+              .singleUse()
+              .executeQuery(
+                  Statement.newBuilder(
+                          "SELECT * FROM "
+                              + metadataTableName
+                              + " WHERE "
+                              + COLUMN_PARTITION_TOKEN
+                              + " = @partition")
+                      .bind("partition")
+                      .to(partitionToken)
+                      .build())) {
+        if (resultSet.next()) {
+          return resultSet.getCurrentRowAsStruct();
+        }
+        return null;

Review comment:
       It should not happen. If this happens we throw an `IllegalStateException` on the caller.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] thiagotnunes commented on a change in pull request #16203: [BEAM-12164] Add Spanner Change Stream Mappers

Posted by GitBox <gi...@apache.org>.
thiagotnunes commented on a change in pull request #16203:
URL: https://github.com/apache/beam/pull/16203#discussion_r780042786



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java
##########
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper;
+
+import com.google.cloud.Timestamp;
+import com.google.cloud.spanner.Struct;
+import java.util.HashSet;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.ChangeStreamResultSetMetadata;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChangeStreamRecord;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChangeStreamRecordMetadata;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartition;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRecord;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ColumnType;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.HeartbeatRecord;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.InitialPartition;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.Mod;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ModType;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.TypeCode;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ValueCaptureType;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+
+/**
+ * This class is responsible for transforming a {@link Struct} to a {@link List} of {@link
+ * ChangeStreamRecord} models.
+ */
+public class ChangeStreamRecordMapper {
+
+  private static final String DATA_CHANGE_RECORD_COLUMN = "data_change_record";
+  private static final String HEARTBEAT_RECORD_COLUMN = "heartbeat_record";
+  private static final String CHILD_PARTITIONS_RECORD_COLUMN = "child_partitions_record";
+
+  private static final String COMMIT_TIMESTAMP_COLUMN = "commit_timestamp";
+  private static final String SERVER_TRANSACTION_ID_COLUMN = "server_transaction_id";
+  private static final String IS_LAST_RECORD_IN_TRANSACTION_IN_PARTITION_COLUMN =
+      "is_last_record_in_transaction_in_partition";
+  private static final String RECORD_SEQUENCE_COLUMN = "record_sequence";
+  private static final String TABLE_NAME_COLUMN = "table_name";
+  private static final String COLUMN_TYPES_COLUMN = "column_types";
+  private static final String MODS_COLUMN = "mods";
+  private static final String MOD_TYPE_COLUMN = "mod_type";
+  private static final String VALUE_CAPTURE_TYPE_COLUMN = "value_capture_type";
+  private static final String NUMBER_OF_RECORDS_IN_TRANSACTION_COLUMN =
+      "number_of_records_in_transaction";
+  private static final String NUMBER_OF_PARTITIONS_IN_TRANSACTION_COLUMN =
+      "number_of_partitions_in_transaction";
+  private static final String NAME_COLUMN = "name";
+  private static final String TYPE_COLUMN = "type";
+  private static final String IS_PRIMARY_KEY_COLUMN = "is_primary_key";
+  private static final String ORDINAL_POSITION_COLUMN = "ordinal_position";
+  private static final String KEYS_COLUMN = "keys";
+  private static final String OLD_VALUES_COLUMN = "old_values";
+  private static final String NEW_VALUES_COLUMN = "new_values";
+
+  private static final String TIMESTAMP_COLUMN = "timestamp";
+
+  private static final String START_TIMESTAMP_COLUMN = "start_timestamp";
+  private static final String CHILD_PARTITIONS_COLUMN = "child_partitions";
+  private static final String PARENT_PARTITION_TOKENS_COLUMN = "parent_partition_tokens";
+  private static final String TOKEN_COLUMN = "token";
+
+  ChangeStreamRecordMapper() {}
+
+  /**
+   * Transforms a {@link Struct} representing a change stream result into a {@link List} of {@link
+   * ChangeStreamRecord} model. The type of the change stream record will be identified and one of
+   * the following subclasses can be returned within the resulting {@link List}:
+   *
+   * <ul>
+   *   <li>{@link DataChangeRecord}
+   *   <li>{@link HeartbeatRecord}
+   *   <li>{@link ChildPartitionsRecord}
+   * </ul>
+   *
+   * Additionally to the {@link Struct} received, the originating partition of the records (given by
+   * the {@link PartitionMetadata} parameter) and the stream metadata (given by the {@link
+   * ChangeStreamResultSetMetadata}) are used to populate the {@link ChangeStreamRecordMetadata} for
+   * each record mapped.
+   *
+   * <p>The {@link Struct} is expected to have the following fields:
+   *
+   * <ul>
+   *   <li>{@link ChangeStreamRecordMapper#DATA_CHANGE_RECORD_COLUMN}: non-nullable {@link Struct}
+   *       list of data change records.
+   *       <ul>
+   *         <li>{@link ChangeStreamRecordMapper#COMMIT_TIMESTAMP_COLUMN}: non-nullable {@link
+   *             Timestamp} representing the timestamp at which the modifications within the record
+   *             were committed in Cloud Spanner.
+   *         <li>{@link ChangeStreamRecordMapper#SERVER_TRANSACTION_ID_COLUMN}: non-nullable {@link
+   *             String} representing the unique transaction id in which the modifications for this
+   *             record occurred.
+   *         <li>{@link ChangeStreamRecordMapper#IS_LAST_RECORD_IN_TRANSACTION_IN_PARTITION_COLUMN}:
+   *             non-nullable {@link Boolean} indicating whether this record is the last emitted for
+   *             the transaction.
+   *         <li>{@link ChangeStreamRecordMapper#RECORD_SEQUENCE_COLUMN}: non-nullable {@link
+   *             String} representing the order in which this record appears within the context of a
+   *             partition, commit timestamp and transaction.
+   *         <li>{@link ChangeStreamRecordMapper#TABLE_NAME_COLUMN}: non-nullable {@link String}
+   *             representing the name of the table in which the modifications for this record
+   *             occurred.
+   *         <li>{@link ChangeStreamRecordMapper#COLUMN_TYPES_COLUMN}: non-nullable {@link List} of
+   *             {@link Struct}s representing the type of the primary keys and modified columns
+   *             within this record.
+   *             <ul>
+   *               <li>{@link ChangeStreamRecordMapper#NAME_COLUMN}: non-nullable {@link String}
+   *                   representing the name of a column.
+   *               <li>{@link ChangeStreamRecordMapper#TYPE_COLUMN}: non-nullable {@link String}
+   *                   representing the type of a column.
+   *               <li>{@link ChangeStreamRecordMapper#IS_PRIMARY_KEY_COLUMN}: non-nullable {@link
+   *                   Boolean} indicating if the column is part of the primary key.
+   *               <li>{@link ChangeStreamRecordMapper#ORDINAL_POSITION_COLUMN}: non-nullable {@link
+   *                   Long} representing the position of the column in the table it is defined.
+   *             </ul>
+   *         <li>{@link ChangeStreamRecordMapper#MODS_COLUMN}: non-nullable {@link List} of {@link
+   *             Struct}s representing the data modifications within this record.
+   *             <ul>
+   *               <li>{@link ChangeStreamRecordMapper#KEYS_COLUMN}: non-nullable {@link String}
+   *                   json object, where keys are the primary key column names, and the values are
+   *                   their corresponding values.
+   *               <li>{@link ChangeStreamRecordMapper#OLD_VALUES_COLUMN}: nullable {@link String}
+   *                   json object displaying the old state of the columns modified, where keys are
+   *                   the column names, and the values are their corresponding values.
+   *               <li>{@link ChangeStreamRecordMapper#NEW_VALUES_COLUMN}: nullable {@link String}
+   *                   json object displaying the new state of the columns modified, where keys are
+   *                   the column names, and the values are their corresponding values.
+   *             </ul>
+   *         <li>{@link ChangeStreamRecordMapper#MOD_TYPE_COLUMN}: non-nullable {@link String}
+   *             representing the type of operation that caused the modifications (see also {@link
+   *             ModType}.
+   *         <li>{@link ChangeStreamRecordMapper#VALUE_CAPTURE_TYPE_COLUMN}: non-nullable {@link
+   *             String} representing the capture type of the change stream that generated this
+   *             record (see also {@link ValueCaptureType}).
+   *         <li>{@link ChangeStreamRecordMapper#NUMBER_OF_RECORDS_IN_TRANSACTION_COLUMN}:
+   *             non-nullable {@link Long} representing the total number of data change records for
+   *             the transaction in which this record occurred.
+   *         <li>{@link ChangeStreamRecordMapper#NUMBER_OF_PARTITIONS_IN_TRANSACTION_COLUMN}:
+   *             non-nullable {@link Long} representing the total number of partitions for the
+   *             transaction in which this record occurred.
+   *       </ul>
+   *   <li>{@link ChangeStreamRecordMapper#HEARTBEAT_RECORD_COLUMN}: non-nullable {@link Struct}
+   *       list of hearbeat records.
+   *       <ul>
+   *         <li>{@link ChangeStreamRecordMapper#TIMESTAMP_COLUMN}: non-nullable {@link Timestamp}
+   *             representing the timestamp for which the change stream query has returned all
+   *             changes (see more in {@link HeartbeatRecord#getTimestamp()}.
+   *       </ul>
+   *   <li>{@link ChangeStreamRecordMapper#CHILD_PARTITIONS_RECORD_COLUMN}: non-nullable {@link
+   *       Struct} list of child partitions records.
+   *       <ul>
+   *         <li>{@link ChangeStreamRecordMapper#START_TIMESTAMP_COLUMN}: non-nullable {@link
+   *             Timestamp} representing the timestamp at which the new partition started being
+   *             valid in Cloud Spanner.
+   *         <li>{@link ChangeStreamRecordMapper#RECORD_SEQUENCE_COLUMN}: non-nullable {@link
+   *             String} representing the order in which this record appears within the context of a
+   *             partition and commit timestamp.
+   *         <li>{@link ChangeStreamRecordMapper#CHILD_PARTITIONS_COLUMN}: non-nullable {@link List}
+   *             of {@link Struct} representing the new child partitions.
+   *             <ul>
+   *               <li>{@link ChangeStreamRecordMapper#TOKEN_COLUMN}: non-nullable {@link String}
+   *                   representing the unique identifier of the new child partition.
+   *               <li>{@link ChangeStreamRecordMapper#PARENT_PARTITION_TOKENS_COLUMN}: non-nullable
+   *                   {@link List} of {@link String} representing the unique identifier(s) of
+   *                   parent partition(s) where this child partition originated from.
+   *             </ul>
+   *       </ul>
+   * </ul>
+   *
+   * @param partition the partition metadata from which the row was generated
+   * @param row the struct row, representing a single change stream result (it may contain multiple
+   *     change stream records within)
+   * @param resultSetMetadata the metadata generated when reading the change stream row
+   * @return a {@link List} of {@link ChangeStreamRecord} subclasses
+   */
+  public List<ChangeStreamRecord> toChangeStreamRecords(
+      PartitionMetadata partition, Struct row, ChangeStreamResultSetMetadata resultSetMetadata) {
+    return row.getStructList(0).stream()
+        .flatMap(struct -> toChangeStreamRecord(partition, struct, resultSetMetadata))
+        .collect(Collectors.toList());
+  }
+
+  private Stream<ChangeStreamRecord> toChangeStreamRecord(
+      PartitionMetadata partition, Struct row, ChangeStreamResultSetMetadata resultSetMetadata) {
+
+    final Stream<DataChangeRecord> dataChangeRecords =
+        row.getStructList(DATA_CHANGE_RECORD_COLUMN).stream()
+            .filter(this::isNonNullDataChangeRecord)
+            .map(struct -> toDataChangeRecord(partition, struct, resultSetMetadata));
+
+    final Stream<HeartbeatRecord> heartbeatRecords =
+        row.getStructList(HEARTBEAT_RECORD_COLUMN).stream()
+            .filter(this::isNonNullHeartbeatRecord)
+            .map(struct -> toHeartbeatRecord(partition, struct, resultSetMetadata));
+
+    final Stream<ChildPartitionsRecord> childPartitionsRecords =
+        row.getStructList(CHILD_PARTITIONS_RECORD_COLUMN).stream()
+            .filter(this::isNonNullChildPartitionsRecord)
+            .map(struct -> toChildPartitionsRecord(partition, struct, resultSetMetadata));
+
+    return Stream.concat(
+        Stream.concat(dataChangeRecords, heartbeatRecords), childPartitionsRecords);

Review comment:
       Oh this is not a problem, because `DataChangeRecord`s timestamps will be < `HeartbeatRecord`s timestamps will be < `ChildPartitionRecord`s timestamps.
   
   Other than that, I don't think we have a mix of these types of records in a single response at the moment.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on a change in pull request #16203: [BEAM-12164] Add Spanner Change Stream Mappers

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #16203:
URL: https://github.com/apache/beam/pull/16203#discussion_r780034446



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java
##########
@@ -0,0 +1,470 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.dao;
+
+import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics.PARTITION_ID_ATTRIBUTE_LABEL;
+import static org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataAdminDao.COLUMN_CREATED_AT;
+import static org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataAdminDao.COLUMN_END_TIMESTAMP;
+import static org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataAdminDao.COLUMN_FINISHED_AT;
+import static org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataAdminDao.COLUMN_HEARTBEAT_MILLIS;
+import static org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataAdminDao.COLUMN_PARENT_TOKENS;
+import static org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataAdminDao.COLUMN_PARTITION_TOKEN;
+import static org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataAdminDao.COLUMN_RUNNING_AT;
+import static org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataAdminDao.COLUMN_SCHEDULED_AT;
+import static org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataAdminDao.COLUMN_START_TIMESTAMP;
+import static org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataAdminDao.COLUMN_STATE;
+import static org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataAdminDao.COLUMN_WATERMARK;
+
+import com.google.cloud.Timestamp;
+import com.google.cloud.spanner.DatabaseClient;
+import com.google.cloud.spanner.Mutation;
+import com.google.cloud.spanner.ResultSet;
+import com.google.cloud.spanner.Statement;
+import com.google.cloud.spanner.Struct;
+import com.google.cloud.spanner.TransactionContext;
+import com.google.cloud.spanner.TransactionRunner;
+import com.google.cloud.spanner.Value;
+import io.opencensus.common.Scope;
+import io.opencensus.trace.AttributeValue;
+import io.opencensus.trace.Tracer;
+import io.opencensus.trace.Tracing;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Function;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata.State;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+
+/** Data access object for the Connector metadata tables. */
+public class PartitionMetadataDao {
+  private static final Tracer TRACER = Tracing.getTracer();
+
+  private final String metadataTableName;
+  private final DatabaseClient databaseClient;
+
+  /**
+   * Constructs a partition metadata dao object given the generated name of the tables.
+   *
+   * @param metadataTableName the name of the partition metadata table
+   * @param databaseClient the {@link DatabaseClient} to perform queries
+   */
+  PartitionMetadataDao(String metadataTableName, DatabaseClient databaseClient) {
+    this.metadataTableName = metadataTableName;
+    this.databaseClient = databaseClient;
+  }
+
+  /**
+   * Fetches the partition metadata row data for the given partition token.
+   *
+   * @param partitionToken the partition unique identifier
+   * @return the partition metadata for the given token if it exists as a struct. Otherwise, it
+   *     returns null.
+   */
+  public @Nullable Struct getPartition(String partitionToken) {
+    try (Scope scope = TRACER.spanBuilder("getPartition").setRecordEvents(true).startScopedSpan()) {
+      TRACER
+          .getCurrentSpan()
+          .putAttribute(
+              PARTITION_ID_ATTRIBUTE_LABEL, AttributeValue.stringAttributeValue(partitionToken));
+      try (ResultSet resultSet =
+          databaseClient
+              .singleUse()
+              .executeQuery(
+                  Statement.newBuilder(
+                          "SELECT * FROM "
+                              + metadataTableName
+                              + " WHERE "
+                              + COLUMN_PARTITION_TOKEN
+                              + " = @partition")
+                      .bind("partition")
+                      .to(partitionToken)
+                      .build())) {
+        if (resultSet.next()) {
+          return resultSet.getCurrentRowAsStruct();
+        }
+        return null;

Review comment:
       How do we handle whenever the partition is missing? I suppose this should not happen right?

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java
##########
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper;
+
+import com.google.cloud.Timestamp;
+import com.google.cloud.spanner.Struct;
+import java.util.HashSet;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.ChangeStreamResultSetMetadata;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChangeStreamRecord;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChangeStreamRecordMetadata;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartition;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRecord;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ColumnType;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.HeartbeatRecord;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.InitialPartition;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.Mod;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ModType;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.TypeCode;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ValueCaptureType;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+
+/**
+ * This class is responsible for transforming a {@link Struct} to a {@link List} of {@link
+ * ChangeStreamRecord} models.
+ */
+public class ChangeStreamRecordMapper {
+
+  private static final String DATA_CHANGE_RECORD_COLUMN = "data_change_record";
+  private static final String HEARTBEAT_RECORD_COLUMN = "heartbeat_record";
+  private static final String CHILD_PARTITIONS_RECORD_COLUMN = "child_partitions_record";
+
+  private static final String COMMIT_TIMESTAMP_COLUMN = "commit_timestamp";
+  private static final String SERVER_TRANSACTION_ID_COLUMN = "server_transaction_id";
+  private static final String IS_LAST_RECORD_IN_TRANSACTION_IN_PARTITION_COLUMN =
+      "is_last_record_in_transaction_in_partition";
+  private static final String RECORD_SEQUENCE_COLUMN = "record_sequence";
+  private static final String TABLE_NAME_COLUMN = "table_name";
+  private static final String COLUMN_TYPES_COLUMN = "column_types";
+  private static final String MODS_COLUMN = "mods";
+  private static final String MOD_TYPE_COLUMN = "mod_type";
+  private static final String VALUE_CAPTURE_TYPE_COLUMN = "value_capture_type";
+  private static final String NUMBER_OF_RECORDS_IN_TRANSACTION_COLUMN =
+      "number_of_records_in_transaction";
+  private static final String NUMBER_OF_PARTITIONS_IN_TRANSACTION_COLUMN =
+      "number_of_partitions_in_transaction";
+  private static final String NAME_COLUMN = "name";
+  private static final String TYPE_COLUMN = "type";
+  private static final String IS_PRIMARY_KEY_COLUMN = "is_primary_key";
+  private static final String ORDINAL_POSITION_COLUMN = "ordinal_position";
+  private static final String KEYS_COLUMN = "keys";
+  private static final String OLD_VALUES_COLUMN = "old_values";
+  private static final String NEW_VALUES_COLUMN = "new_values";
+
+  private static final String TIMESTAMP_COLUMN = "timestamp";
+
+  private static final String START_TIMESTAMP_COLUMN = "start_timestamp";
+  private static final String CHILD_PARTITIONS_COLUMN = "child_partitions";
+  private static final String PARENT_PARTITION_TOKENS_COLUMN = "parent_partition_tokens";
+  private static final String TOKEN_COLUMN = "token";
+
+  ChangeStreamRecordMapper() {}
+
+  /**
+   * Transforms a {@link Struct} representing a change stream result into a {@link List} of {@link
+   * ChangeStreamRecord} model. The type of the change stream record will be identified and one of
+   * the following subclasses can be returned within the resulting {@link List}:
+   *
+   * <ul>
+   *   <li>{@link DataChangeRecord}
+   *   <li>{@link HeartbeatRecord}
+   *   <li>{@link ChildPartitionsRecord}
+   * </ul>
+   *
+   * Additionally to the {@link Struct} received, the originating partition of the records (given by
+   * the {@link PartitionMetadata} parameter) and the stream metadata (given by the {@link
+   * ChangeStreamResultSetMetadata}) are used to populate the {@link ChangeStreamRecordMetadata} for
+   * each record mapped.
+   *
+   * <p>The {@link Struct} is expected to have the following fields:
+   *
+   * <ul>
+   *   <li>{@link ChangeStreamRecordMapper#DATA_CHANGE_RECORD_COLUMN}: non-nullable {@link Struct}
+   *       list of data change records.
+   *       <ul>
+   *         <li>{@link ChangeStreamRecordMapper#COMMIT_TIMESTAMP_COLUMN}: non-nullable {@link
+   *             Timestamp} representing the timestamp at which the modifications within the record
+   *             were committed in Cloud Spanner.
+   *         <li>{@link ChangeStreamRecordMapper#SERVER_TRANSACTION_ID_COLUMN}: non-nullable {@link
+   *             String} representing the unique transaction id in which the modifications for this
+   *             record occurred.
+   *         <li>{@link ChangeStreamRecordMapper#IS_LAST_RECORD_IN_TRANSACTION_IN_PARTITION_COLUMN}:
+   *             non-nullable {@link Boolean} indicating whether this record is the last emitted for
+   *             the transaction.
+   *         <li>{@link ChangeStreamRecordMapper#RECORD_SEQUENCE_COLUMN}: non-nullable {@link
+   *             String} representing the order in which this record appears within the context of a
+   *             partition, commit timestamp and transaction.
+   *         <li>{@link ChangeStreamRecordMapper#TABLE_NAME_COLUMN}: non-nullable {@link String}
+   *             representing the name of the table in which the modifications for this record
+   *             occurred.
+   *         <li>{@link ChangeStreamRecordMapper#COLUMN_TYPES_COLUMN}: non-nullable {@link List} of
+   *             {@link Struct}s representing the type of the primary keys and modified columns
+   *             within this record.
+   *             <ul>
+   *               <li>{@link ChangeStreamRecordMapper#NAME_COLUMN}: non-nullable {@link String}
+   *                   representing the name of a column.
+   *               <li>{@link ChangeStreamRecordMapper#TYPE_COLUMN}: non-nullable {@link String}
+   *                   representing the type of a column.
+   *               <li>{@link ChangeStreamRecordMapper#IS_PRIMARY_KEY_COLUMN}: non-nullable {@link
+   *                   Boolean} indicating if the column is part of the primary key.
+   *               <li>{@link ChangeStreamRecordMapper#ORDINAL_POSITION_COLUMN}: non-nullable {@link
+   *                   Long} representing the position of the column in the table it is defined.
+   *             </ul>
+   *         <li>{@link ChangeStreamRecordMapper#MODS_COLUMN}: non-nullable {@link List} of {@link
+   *             Struct}s representing the data modifications within this record.
+   *             <ul>
+   *               <li>{@link ChangeStreamRecordMapper#KEYS_COLUMN}: non-nullable {@link String}
+   *                   json object, where keys are the primary key column names, and the values are
+   *                   their corresponding values.
+   *               <li>{@link ChangeStreamRecordMapper#OLD_VALUES_COLUMN}: nullable {@link String}
+   *                   json object displaying the old state of the columns modified, where keys are
+   *                   the column names, and the values are their corresponding values.
+   *               <li>{@link ChangeStreamRecordMapper#NEW_VALUES_COLUMN}: nullable {@link String}
+   *                   json object displaying the new state of the columns modified, where keys are
+   *                   the column names, and the values are their corresponding values.
+   *             </ul>
+   *         <li>{@link ChangeStreamRecordMapper#MOD_TYPE_COLUMN}: non-nullable {@link String}
+   *             representing the type of operation that caused the modifications (see also {@link
+   *             ModType}.
+   *         <li>{@link ChangeStreamRecordMapper#VALUE_CAPTURE_TYPE_COLUMN}: non-nullable {@link
+   *             String} representing the capture type of the change stream that generated this
+   *             record (see also {@link ValueCaptureType}).
+   *         <li>{@link ChangeStreamRecordMapper#NUMBER_OF_RECORDS_IN_TRANSACTION_COLUMN}:
+   *             non-nullable {@link Long} representing the total number of data change records for
+   *             the transaction in which this record occurred.
+   *         <li>{@link ChangeStreamRecordMapper#NUMBER_OF_PARTITIONS_IN_TRANSACTION_COLUMN}:
+   *             non-nullable {@link Long} representing the total number of partitions for the
+   *             transaction in which this record occurred.
+   *       </ul>
+   *   <li>{@link ChangeStreamRecordMapper#HEARTBEAT_RECORD_COLUMN}: non-nullable {@link Struct}
+   *       list of hearbeat records.
+   *       <ul>
+   *         <li>{@link ChangeStreamRecordMapper#TIMESTAMP_COLUMN}: non-nullable {@link Timestamp}
+   *             representing the timestamp for which the change stream query has returned all
+   *             changes (see more in {@link HeartbeatRecord#getTimestamp()}.
+   *       </ul>
+   *   <li>{@link ChangeStreamRecordMapper#CHILD_PARTITIONS_RECORD_COLUMN}: non-nullable {@link
+   *       Struct} list of child partitions records.
+   *       <ul>
+   *         <li>{@link ChangeStreamRecordMapper#START_TIMESTAMP_COLUMN}: non-nullable {@link
+   *             Timestamp} representing the timestamp at which the new partition started being
+   *             valid in Cloud Spanner.
+   *         <li>{@link ChangeStreamRecordMapper#RECORD_SEQUENCE_COLUMN}: non-nullable {@link
+   *             String} representing the order in which this record appears within the context of a
+   *             partition and commit timestamp.
+   *         <li>{@link ChangeStreamRecordMapper#CHILD_PARTITIONS_COLUMN}: non-nullable {@link List}
+   *             of {@link Struct} representing the new child partitions.
+   *             <ul>
+   *               <li>{@link ChangeStreamRecordMapper#TOKEN_COLUMN}: non-nullable {@link String}
+   *                   representing the unique identifier of the new child partition.
+   *               <li>{@link ChangeStreamRecordMapper#PARENT_PARTITION_TOKENS_COLUMN}: non-nullable
+   *                   {@link List} of {@link String} representing the unique identifier(s) of
+   *                   parent partition(s) where this child partition originated from.
+   *             </ul>
+   *       </ul>
+   * </ul>
+   *
+   * @param partition the partition metadata from which the row was generated
+   * @param row the struct row, representing a single change stream result (it may contain multiple
+   *     change stream records within)
+   * @param resultSetMetadata the metadata generated when reading the change stream row
+   * @return a {@link List} of {@link ChangeStreamRecord} subclasses
+   */
+  public List<ChangeStreamRecord> toChangeStreamRecords(
+      PartitionMetadata partition, Struct row, ChangeStreamResultSetMetadata resultSetMetadata) {
+    return row.getStructList(0).stream()
+        .flatMap(struct -> toChangeStreamRecord(partition, struct, resultSetMetadata))
+        .collect(Collectors.toList());
+  }
+
+  private Stream<ChangeStreamRecord> toChangeStreamRecord(
+      PartitionMetadata partition, Struct row, ChangeStreamResultSetMetadata resultSetMetadata) {
+
+    final Stream<DataChangeRecord> dataChangeRecords =
+        row.getStructList(DATA_CHANGE_RECORD_COLUMN).stream()
+            .filter(this::isNonNullDataChangeRecord)
+            .map(struct -> toDataChangeRecord(partition, struct, resultSetMetadata));
+
+    final Stream<HeartbeatRecord> heartbeatRecords =
+        row.getStructList(HEARTBEAT_RECORD_COLUMN).stream()
+            .filter(this::isNonNullHeartbeatRecord)
+            .map(struct -> toHeartbeatRecord(partition, struct, resultSetMetadata));
+
+    final Stream<ChildPartitionsRecord> childPartitionsRecords =
+        row.getStructList(CHILD_PARTITIONS_RECORD_COLUMN).stream()
+            .filter(this::isNonNullChildPartitionsRecord)
+            .map(struct -> toChildPartitionsRecord(partition, struct, resultSetMetadata));
+
+    return Stream.concat(
+        Stream.concat(dataChangeRecords, heartbeatRecords), childPartitionsRecords);

Review comment:
       I'm sure you've thought about this, but does it not matter that we're reordering the records here? (moving all Data Change Records  to the begining, then heartbeat, then child partitions)

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java
##########
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper;
+
+import com.google.cloud.Timestamp;
+import com.google.cloud.spanner.Struct;
+import java.util.HashSet;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.ChangeStreamResultSetMetadata;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChangeStreamRecord;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChangeStreamRecordMetadata;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartition;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRecord;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ColumnType;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.HeartbeatRecord;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.InitialPartition;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.Mod;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ModType;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.TypeCode;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ValueCaptureType;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+
+/**
+ * This class is responsible for transforming a {@link Struct} to a {@link List} of {@link
+ * ChangeStreamRecord} models.
+ */
+public class ChangeStreamRecordMapper {
+
+  private static final String DATA_CHANGE_RECORD_COLUMN = "data_change_record";

Review comment:
       I suppose this will be largely understood by the maintainers, but could we add an action item to document some of these constants for new people trying to become familiar with them? Perhaps link to Spanner CDC docs

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/ChangeStreamMetrics.java
##########
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams;
+
+import java.io.Serializable;
+
+/** Class to aggregate metrics related functionality. */
+public class ChangeStreamMetrics implements Serializable {

Review comment:
       I suppose for now this doesn't do anything?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] thiagotnunes commented on a change in pull request #16203: [BEAM-12164] Add Spanner Change Stream Mappers

Posted by GitBox <gi...@apache.org>.
thiagotnunes commented on a change in pull request #16203:
URL: https://github.com/apache/beam/pull/16203#discussion_r780039090



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java
##########
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper;
+
+import com.google.cloud.Timestamp;
+import com.google.cloud.spanner.Struct;
+import java.util.HashSet;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.ChangeStreamResultSetMetadata;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChangeStreamRecord;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChangeStreamRecordMetadata;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartition;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRecord;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ColumnType;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.HeartbeatRecord;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.InitialPartition;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.Mod;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ModType;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.TypeCode;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ValueCaptureType;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+
+/**
+ * This class is responsible for transforming a {@link Struct} to a {@link List} of {@link
+ * ChangeStreamRecord} models.
+ */
+public class ChangeStreamRecordMapper {
+
+  private static final String DATA_CHANGE_RECORD_COLUMN = "data_change_record";

Review comment:
       Good idea. We don't have public documentation at the moment, so I will link to the private documentation for now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] thiagotnunes commented on pull request #16203: [BEAM-12164] Add Spanner Change Stream Mappers

Posted by GitBox <gi...@apache.org>.
thiagotnunes commented on pull request #16203:
URL: https://github.com/apache/beam/pull/16203#issuecomment-992094285


   DO NOT MERGE, depends on https://github.com/apache/beam/pull/16161


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] thiagotnunes commented on pull request #16203: [BEAM-12164] Add Spanner Change Stream Mappers

Posted by GitBox <gi...@apache.org>.
thiagotnunes commented on pull request #16203:
URL: https://github.com/apache/beam/pull/16203#issuecomment-1004423602


    ​retest this please


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem merged pull request #16203: [BEAM-12164] Add Spanner Change Stream Mappers

Posted by GitBox <gi...@apache.org>.
pabloem merged pull request #16203:
URL: https://github.com/apache/beam/pull/16203


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org