You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/08/03 23:15:31 UTC

[GitHub] [beam] laraschmidt opened a new pull request #15275: [BEAM-7386] Adding EventTimeEquiJoin

laraschmidt opened a new pull request #15275:
URL: https://github.com/apache/beam/pull/15275


   Adding EventTimeEquiJoin which is a PTransform that joins two streams where the comparison is time-stamp bounded. 
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   `ValidatesRunner` compliance status (on master branch)
   --------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Lang</th>
         <th>ULR</th>
         <th>Dataflow</th>
         <th>Flink</th>
         <th>Samza</th>
         <th>Spark</th>
         <th>Twister2</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Go</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon">
           </a>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
       <tr>
         <td>Java</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_ULR/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_ULR/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming/lastCompletedBuild/badge/icon?subject=V1+Streaming">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon?subject=V1+Java+11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2_Streaming/lastCompletedBuild/badge/icon?subject=V2+Streaming">
           </a><br>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon?subject=Java+8">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon?subject=Java+11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon?subject=Portable+Streaming">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Samza/lastCompletedBuild/badge/icon?subject=Portable">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon?subject=Structured+Streaming">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon">
           </a>
         </td>
       </tr>
       <tr>
         <td>Python</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon?subject=ValCont">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
       <tr>
         <td>XLang</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   Examples testing status on various runners
   --------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Lang</th>
         <th>ULR</th>
         <th>Dataflow</th>
         <th>Flink</th>
         <th>Samza</th>
         <th>Spark</th>
         <th>Twister2</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Go</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>Java</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Java11_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Java11_Cron/lastCompletedBuild/badge/icon?subject=V1+Java11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_Examples_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_Examples_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
         </td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>Python</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>XLang</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   Post-Commit SDK/Transform Integration Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Go</th>
         <th>Java</th>
         <th>Python</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon?subject=3.6">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon?subject=3.7">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon?subject=3.8">
           </a>
         </td>
       </tr>
     </tbody>
   </table>
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>---</th>
         <th>Java</th>
         <th>Python</th>
         <th>Go</th>
         <th>Website</th>
         <th>Whitespace</th>
         <th>Typescript</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Non-portable</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon">
           </a><br>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon?subject=Tests">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon?subject=Lint">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon?subject=Docker">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon?subject=Docs">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
       </tr>
       <tr>
         <td>Portable</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_GoPortable_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_GoPortable_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aaltay edited a comment on pull request #15275: [BEAM-7386] Adding EventTimeEquiJoin

Posted by GitBox <gi...@apache.org>.
aaltay edited a comment on pull request #15275:
URL: https://github.com/apache/beam/pull/15275#issuecomment-1002723320


   ~Is this still relevant? Should we close it?~ 
   
   Ignore that. I saw the recent changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aaltay commented on pull request #15275: [BEAM-7386] Adding EventTimeEquiJoin

Posted by GitBox <gi...@apache.org>.
aaltay commented on pull request #15275:
URL: https://github.com/apache/beam/pull/15275#issuecomment-1002723320


   Is this still relevant? Should we close it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aaltay commented on pull request #15275: [BEAM-7386] Adding EventTimeEquiJoin

Posted by GitBox <gi...@apache.org>.
aaltay commented on pull request #15275:
URL: https://github.com/apache/beam/pull/15275#issuecomment-1007136968


   @laraschmidt - how is your other PR doing? :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] laraschmidt commented on a change in pull request #15275: [BEAM-7386] Adding EventTimeEquiJoin

Posted by GitBox <gi...@apache.org>.
laraschmidt commented on a change in pull request #15275:
URL: https://github.com/apache/beam/pull/15275#discussion_r727405900



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/PairCoder.java
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.join;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.List;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.StructuredCoder;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+
+/** A {@link Coder} for {@link Pair}s that defers to underlying coders. */
+public class PairCoder<V1, V2> extends StructuredCoder<Pair<V1, V2>> {
+  private final Coder<V1> firstCoder;
+  private final Coder<V2> secondCoder;
+
+  private PairCoder(Coder<V1> firstCoder, Coder<V2> secondCoder) {
+    this.firstCoder = firstCoder;
+    this.secondCoder = secondCoder;
+  }
+
+  /** Returns a {@link PairCoder} for the given underlying value coders. */
+  public static <V1, V2> PairCoder<V1, V2> of(Coder<V1> firstCoder, Coder<V2> secondCoder) {
+    return new PairCoder<>(firstCoder, secondCoder);
+  }
+
+  @Override
+  public void encode(Pair<V1, V2> value, OutputStream outStream)
+      throws CoderException, IOException {
+    firstCoder.encode(value.getFirst(), outStream);

Review comment:
       Not familiar with the deprecated context thing but I copied it over from KVCoder. What exactly is it used for? I assume that in my case I want context on both (whereas the KV coder only put it on the value). 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] laraschmidt commented on a change in pull request #15275: [BEAM-7386] Adding EventTimeEquiJoin

Posted by GitBox <gi...@apache.org>.
laraschmidt commented on a change in pull request #15275:
URL: https://github.com/apache/beam/pull/15275#discussion_r727412594



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/PairCoder.java
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.join;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.List;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.StructuredCoder;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+
+/** A {@link Coder} for {@link Pair}s that defers to underlying coders. */
+public class PairCoder<V1, V2> extends StructuredCoder<Pair<V1, V2>> {
+  private final Coder<V1> firstCoder;
+  private final Coder<V2> secondCoder;
+
+  private PairCoder(Coder<V1> firstCoder, Coder<V2> secondCoder) {
+    this.firstCoder = firstCoder;
+    this.secondCoder = secondCoder;
+  }
+
+  /** Returns a {@link PairCoder} for the given underlying value coders. */
+  public static <V1, V2> PairCoder<V1, V2> of(Coder<V1> firstCoder, Coder<V2> secondCoder) {
+    return new PairCoder<>(firstCoder, secondCoder);
+  }
+
+  @Override
+  public void encode(Pair<V1, V2> value, OutputStream outStream)
+      throws CoderException, IOException {
+    firstCoder.encode(value.getFirst(), outStream);

Review comment:
       Hrm, having both failed so I swapped it to just be on the second element.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] laraschmidt commented on a change in pull request #15275: [BEAM-7386] Adding EventTimeEquiJoin

Posted by GitBox <gi...@apache.org>.
laraschmidt commented on a change in pull request #15275:
URL: https://github.com/apache/beam/pull/15275#discussion_r727405900



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/PairCoder.java
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.join;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.List;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.StructuredCoder;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+
+/** A {@link Coder} for {@link Pair}s that defers to underlying coders. */
+public class PairCoder<V1, V2> extends StructuredCoder<Pair<V1, V2>> {
+  private final Coder<V1> firstCoder;
+  private final Coder<V2> secondCoder;
+
+  private PairCoder(Coder<V1> firstCoder, Coder<V2> secondCoder) {
+    this.firstCoder = firstCoder;
+    this.secondCoder = secondCoder;
+  }
+
+  /** Returns a {@link PairCoder} for the given underlying value coders. */
+  public static <V1, V2> PairCoder<V1, V2> of(Coder<V1> firstCoder, Coder<V2> secondCoder) {
+    return new PairCoder<>(firstCoder, secondCoder);
+  }
+
+  @Override
+  public void encode(Pair<V1, V2> value, OutputStream outStream)
+      throws CoderException, IOException {
+    firstCoder.encode(value.getFirst(), outStream);

Review comment:
       Not familiar with the deprecated context thing but I copied it over from KVCoder. What exactly is it used for? I assume that in my case I want context on both (whereas the KV coder only put it on the value). 

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/PairCoder.java
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.join;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.List;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.StructuredCoder;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+
+/** A {@link Coder} for {@link Pair}s that defers to underlying coders. */
+public class PairCoder<V1, V2> extends StructuredCoder<Pair<V1, V2>> {
+  private final Coder<V1> firstCoder;
+  private final Coder<V2> secondCoder;
+
+  private PairCoder(Coder<V1> firstCoder, Coder<V2> secondCoder) {
+    this.firstCoder = firstCoder;
+    this.secondCoder = secondCoder;
+  }
+
+  /** Returns a {@link PairCoder} for the given underlying value coders. */
+  public static <V1, V2> PairCoder<V1, V2> of(Coder<V1> firstCoder, Coder<V2> secondCoder) {
+    return new PairCoder<>(firstCoder, secondCoder);
+  }
+
+  @Override
+  public void encode(Pair<V1, V2> value, OutputStream outStream)
+      throws CoderException, IOException {
+    firstCoder.encode(value.getFirst(), outStream);

Review comment:
       Hrm, having both failed so I swapped it to just be on the second element.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] laraschmidt commented on pull request #15275: [BEAM-7386] Adding EventTimeEquiJoin

Posted by GitBox <gi...@apache.org>.
laraschmidt commented on pull request #15275:
URL: https://github.com/apache/beam/pull/15275#issuecomment-903005829


   @reuvenlax @kennknowles I implemented changes as if we were to remove the checks we discussed offline (one I avoided with the skew and the other by removing it). Please take a look, thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] laraschmidt commented on pull request #15275: [BEAM-7386] Adding EventTimeEquiJoin

Posted by GitBox <gi...@apache.org>.
laraschmidt commented on pull request #15275:
URL: https://github.com/apache/beam/pull/15275#issuecomment-1007579994


   The other one is in but it does not work for portable runner because making it work for protable runner breaks dataflow on portable runner. :)
   
   But the changes can actually be treated as pretty independent. That one was making the function we use here not as deprecated but we can still use it in this PR even if deprecated. So this is still ready for review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax commented on pull request #15275: [BEAM-7386] Adding EventTimeEquiJoin

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on pull request #15275:
URL: https://github.com/apache/beam/pull/15275#issuecomment-1007610045






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax commented on pull request #15275: [BEAM-7386] Adding EventTimeEquiJoin

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on pull request #15275:
URL: https://github.com/apache/beam/pull/15275#issuecomment-1002726634


   I think it's still relevant. It was blocked on Lara's other PR.
   
   On Wed, Dec 29, 2021 at 10:23 AM Ahmet Altay ***@***.***>
   wrote:
   
   > Is this still relevant? Should we close it?
   >
   > —
   > Reply to this email directly, view it on GitHub
   > <https://github.com/apache/beam/pull/15275#issuecomment-1002723320>, or
   > unsubscribe
   > <https://github.com/notifications/unsubscribe-auth/AFAYJVLMOFDSNGLDW7JIHBTUTNGZDANCNFSM5BPZWL6Q>
   > .
   > Triage notifications on the go with GitHub Mobile for iOS
   > <https://apps.apple.com/app/apple-store/id1477376905?ct=notification-email&mt=8&pt=524675>
   > or Android
   > <https://play.google.com/store/apps/details?id=com.github.android&referrer=utm_campaign%3Dnotification-email%26utm_medium%3Demail%26utm_source%3Dgithub>.
   >
   > You are receiving this because you were mentioned.Message ID:
   > ***@***.***>
   >
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax commented on pull request #15275: [BEAM-7386] Adding EventTimeEquiJoin

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on pull request #15275:
URL: https://github.com/apache/beam/pull/15275#issuecomment-1007633829


   Right, but you need to be able to set a timer with a hold older than the
   current input element, right?
   
   On Fri, Jan 7, 2022 at 10:23 AM laraschmidt ***@***.***>
   wrote:
   
   > That was for an earlier iteration. The solution we ended up going with
   > only outputs from the actual doFn and only uses timers to hold the
   > watermark. So we didn't end up actually needing to allow older elements
   > from timers because we accepted that we will allow older elements from the
   > actual DoFn.
   >
   > —
   > Reply to this email directly, view it on GitHub
   > <https://github.com/apache/beam/pull/15275#issuecomment-1007632856>, or
   > unsubscribe
   > <https://github.com/notifications/unsubscribe-auth/AFAYJVIA3SCSRCB5NQQW47TUU4VSVANCNFSM5BPZWL6Q>
   > .
   > Triage notifications on the go with GitHub Mobile for iOS
   > <https://apps.apple.com/app/apple-store/id1477376905?ct=notification-email&mt=8&pt=524675>
   > or Android
   > <https://play.google.com/store/apps/details?id=com.github.android&referrer=utm_campaign%3Dnotification-email%26utm_medium%3Demail%26utm_source%3Dgithub>.
   >
   > You are receiving this because you were mentioned.Message ID:
   > ***@***.***>
   >
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax commented on pull request #15275: [BEAM-7386] Adding EventTimeEquiJoin

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on pull request #15275:
URL: https://github.com/apache/beam/pull/15275#issuecomment-1007674641


   I need to take a look at the code, since it's changed a lot. However
   elements don't arrive in timestamp order, so I'm not sure how you
   would avoid this.
   
   On Fri, Jan 7, 2022 at 10:35 AM laraschmidt ***@***.***>
   wrote:
   
   > I'm trying to page this back in. But if I remember correctly, the first
   > element holds the watermark back until all not-late second elements would
   > have appeared. So we don't actually need to hold it backwards. It's just
   > the processElement that puts out older elements.
   >
   > Here's the relevant code for the timer which are not negative:
   > if (elementAffectsWatermarkHolds) {
   > addTimer(watermarkHolds,
   > ts.plus(thisCollectionValidFor)).withOutputTimestamp(ts);
   >
   > —
   > Reply to this email directly, view it on GitHub
   > <https://github.com/apache/beam/pull/15275#issuecomment-1007641545>, or
   > unsubscribe
   > <https://github.com/notifications/unsubscribe-auth/AFAYJVOZTULKXCPAB2WT3VTUU4W7ZANCNFSM5BPZWL6Q>
   > .
   > Triage notifications on the go with GitHub Mobile for iOS
   > <https://apps.apple.com/app/apple-store/id1477376905?ct=notification-email&mt=8&pt=524675>
   > or Android
   > <https://play.google.com/store/apps/details?id=com.github.android&referrer=utm_campaign%3Dnotification-email%26utm_medium%3Demail%26utm_source%3Dgithub>.
   >
   > You are receiving this because you were mentioned.Message ID:
   > ***@***.***>
   >
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] laraschmidt commented on pull request #15275: [BEAM-7386] Adding EventTimeEquiJoin

Posted by GitBox <gi...@apache.org>.
laraschmidt commented on pull request #15275:
URL: https://github.com/apache/beam/pull/15275#issuecomment-1007641545


   I'm trying to page this back in. But if I remember correctly, the first element holds the watermark back until all not-late second elements would have appeared. So we don't actually need to hold it backwards. It's just the processElement that puts out older elements.
   
   Here's the relevant code for the timer which are not negative:
         if (elementAffectsWatermarkHolds) {
           addTimer(watermarkHolds, ts.plus(thisCollectionValidFor)).withOutputTimestamp(ts);


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] laraschmidt commented on pull request #15275: [BEAM-7386] Adding EventTimeEquiJoin

Posted by GitBox <gi...@apache.org>.
laraschmidt commented on pull request #15275:
URL: https://github.com/apache/beam/pull/15275#issuecomment-914614453


   Ping. :) @reuvenlax @kennknowles 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] laraschmidt commented on pull request #15275: [BEAM-7386] Adding EventTimeEquiJoin

Posted by GitBox <gi...@apache.org>.
laraschmidt commented on pull request #15275:
URL: https://github.com/apache/beam/pull/15275#issuecomment-933719496


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax commented on a change in pull request #15275: [BEAM-7386] Adding EventTimeEquiJoin

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on a change in pull request #15275:
URL: https://github.com/apache/beam/pull/15275#discussion_r682982815



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/EventTimeEquiJoin.java
##########
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.join;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.state.OrderedListState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.TimerMap;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * Returns a {@link PTransform} that performs a {@link EventTimeEquiJoin} on two PCollections. A
+ * {@link EventTimeEquiJoin} joins elements with equal keys bounded by the difference in event time.
+ *
+ * <p>Example of performing a {@link EventTimeEquiJoin}:
+ *
+ * <pre>{@code
+ * PCollection<KV<K, V1>> pt1 = ...;
+ * PCollection<KV<K, V2>> pt2 = ...;
+ *
+ * PCollection<KV<K, Pair<V1, V2>> eventTimeEquiJoinCollection =
+ *   pt1.apply(EventTimeEquiJoin.<K, V1, V2>of(pt2));
+ *
+ * @param secondCollection the second collection to use in the join.
+ * @param <K> the type of the keys in the input {@code PCollection}s
+ * @param <V1> the type of the value in the first {@code PCollection}
+ * @param <V2> the type of the value in the second {@code PCollection}
+ * </pre>
+ */
+@AutoValue
+public abstract class EventTimeEquiJoin<K, V1, V2>
+    extends PTransform<PCollection<KV<K, V1>>, PCollection<KV<K, Pair<V1, V2>>>> {
+  /** Returns a {@link PTransform} that performs a {@link EventTimeEquiJoin} on two PCollections. */
+  public static <K, V1, V2> EventTimeEquiJoin<K, V1, V2> of(
+      PCollection<KV<K, V2>> secondCollection) {
+    return new AutoValue_EventTimeEquiJoin.Builder<K, V1, V2>()
+        .setSecondCollection(secondCollection)
+        .setFirstCollectionValidFor(Duration.ZERO)
+        .setSecondCollectionValidFor(Duration.ZERO)
+        .setAllowedLateness(Duration.ZERO)
+        .build();
+  }
+
+  abstract PCollection<KV<K, V2>> getSecondCollection();
+
+  abstract Duration getFirstCollectionValidFor();
+
+  abstract Duration getSecondCollectionValidFor();
+
+  abstract Duration getAllowedLateness();
+
+  abstract Builder<K, V1, V2> toBuilder();
+
+  @AutoValue.Builder
+  public abstract static class Builder<K, V1, V2> {
+    public abstract Builder<K, V1, V2> setSecondCollection(PCollection<KV<K, V2>> value);
+
+    public abstract Builder<K, V1, V2> setFirstCollectionValidFor(Duration value);
+
+    public abstract Builder<K, V1, V2> setSecondCollectionValidFor(Duration value);
+
+    public abstract Builder<K, V1, V2> setAllowedLateness(Duration value);
+
+    abstract EventTimeEquiJoin<K, V1, V2> build();
+  }
+
+  /**
+   * Returns a {@code Impl<K, V1, V2>} {@code PTransform} that matches elements from the first and
+   * second collection with the same keys if their timestamps are within the given interval.
+   *
+   * @param interval the allowed difference between the timestamps to allow a match
+   */
+  public EventTimeEquiJoin<K, V1, V2> within(Duration interval) {
+    return toBuilder()
+        .setFirstCollectionValidFor(interval)
+        .setSecondCollectionValidFor(interval)
+        .build();
+  }
+
+  /**
+   * Returns a {@code Impl<K, V1, V2>} {@code PTransform} that matches elements from the first and
+   * second collection with the same keys if the collection's element comes within the valid time
+   * range for the other collection.
+   *
+   * @param firstCollectionValidFor the valid time range for the first collection
+   * @param secondCollectionValidFor the valid time range for the second collection
+   */
+  public EventTimeEquiJoin<K, V1, V2> within(
+      Duration firstCollectionValidFor, Duration secondCollectionValidFor) {
+    return toBuilder()
+        .setFirstCollectionValidFor(firstCollectionValidFor)
+        .setSecondCollectionValidFor(secondCollectionValidFor)
+        .build();
+  }
+
+  /**
+   * Returns a {@code Impl<K, V1, V2>} {@code PTransform} that matches elements from the first and
+   * second collection with the same keys and allows for late elements
+   *
+   * @param allowedLateness the amount of time late elements are allowed.
+   */
+  public EventTimeEquiJoin<K, V1, V2> withAllowedLateness(Duration allowedLateness) {
+    return toBuilder().setAllowedLateness(allowedLateness).build();
+  }
+
+  @Override
+  public PCollection<KV<K, Pair<V1, V2>>> expand(PCollection<KV<K, V1>> input) {
+    Coder<K> keyCoder = JoinUtils.getKeyCoder(input);
+    Coder<V1> firstValueCoder = JoinUtils.getValueCoder(input);
+    Coder<V2> secondValueCoder = JoinUtils.getValueCoder(getSecondCollection());
+    UnionCoder unionCoder = UnionCoder.of(ImmutableList.of(firstValueCoder, secondValueCoder));
+    KvCoder<K, RawUnionValue> kvCoder = KvCoder.of(JoinUtils.getKeyCoder(input), unionCoder);
+    PCollectionList<KV<K, RawUnionValue>> union =
+        PCollectionList.of(JoinUtils.makeUnionTable(0, input, kvCoder))
+            .and(JoinUtils.makeUnionTable(1, getSecondCollection(), kvCoder));
+    return union
+        .apply("Flatten", Flatten.pCollections())
+        .apply(
+            "Join",
+            ParDo.of(
+                new EventTimeEquiJoinDoFn<>(
+                    firstValueCoder,
+                    secondValueCoder,
+                    getFirstCollectionValidFor(),
+                    getSecondCollectionValidFor(),
+                    getAllowedLateness())))
+        .setCoder(KvCoder.of(keyCoder, PairCoder.<V1, V2>of(firstValueCoder, secondValueCoder)));
+  }
+
+  private static class EventTimeEquiJoinDoFn<K, V1, V2>
+      extends DoFn<KV<K, RawUnionValue>, KV<K, Pair<V1, V2>>> {
+    private static final int FIRST_TAG = 0;
+    private static final int SECOND_TAG = 1;
+
+    // Bucket cleanup timers into TIMER_BUCKET length buckets.
+    private static final long TIMER_BUCKET = Duration.standardMinutes(1).getMillis();
+
+    // How long elements in the first and second collection are valid (can be matched) for.
+    private final Duration firstCollectionValidFor;
+    private final Duration secondCollectionValidFor;
+
+    // How long past the watermark that late elements can show up.
+    private final Duration allowedLateness;
+
+    @StateId("v1Items")
+    private final StateSpec<OrderedListState<V1>> firstCollectionItems;
+
+    @StateId("v2Items")
+    private final StateSpec<OrderedListState<V2>> secondCollectionItems;
+
+    @TimerFamily("cleanupTimers")
+    private final TimerSpec cleanupTimers = TimerSpecs.timerMap(TimeDomain.EVENT_TIME);
+
+    public EventTimeEquiJoinDoFn(
+        Coder<V1> firstValueCoder,
+        Coder<V2> secondValueCoder,
+        Duration firstValidFor,
+        Duration secondValidFor,
+        Duration allowedLateness) {
+      this.firstCollectionValidFor = firstValidFor;
+      this.secondCollectionValidFor = secondValidFor;
+      this.allowedLateness = allowedLateness;
+      firstCollectionItems = StateSpecs.orderedList(firstValueCoder);
+      secondCollectionItems = StateSpecs.orderedList(secondValueCoder);
+    }
+
+    @FunctionalInterface
+    private interface Output<T1, T2> {
+      void apply(T1 one, T2 two);
+    }
+
+    private <T, O> void processHelper(
+        Output<T, O> output,
+        KV<K, RawUnionValue> element,
+        Instant ts,
+        OrderedListState<T> thisCollection,
+        OrderedListState<O> otherCollection,
+        TimerMap cleanupTimers,
+        Duration thisCollectionValidFor,
+        Duration otherCollectionValidFor) {
+      thisCollection.add(TimestampedValue.of((T) element.getValue().getValue(), ts));
+      Instant beginning = ts.minus(otherCollectionValidFor);
+      Instant end = ts.plus(thisCollectionValidFor).plus(1L);
+      for (TimestampedValue<O> value : otherCollection.readRange(beginning, end)) {
+        output.apply((T) element.getValue().getValue(), value.getValue());
+      }
+      Instant cleanupTime = ts.plus(allowedLateness).plus(thisCollectionValidFor);
+      Instant nextBucketStart =
+          Instant.ofEpochMilli(
+              cleanupTime.getMillis() / TIMER_BUCKET * TIMER_BUCKET + TIMER_BUCKET);
+      cleanupTimers.get(Long.toString(nextBucketStart.getMillis())).set(nextBucketStart);
+    }
+
+    @ProcessElement
+    public void process(
+        ProcessContext context,
+        @Element KV<K, RawUnionValue> element,
+        @Timestamp Instant ts,
+        @StateId("v1Items") OrderedListState<V1> firstItems,
+        @StateId("v2Items") OrderedListState<V2> secondItems,
+        @TimerFamily("cleanupTimers") TimerMap cleanupTimers) {
+      switch (element.getValue().getUnionTag()) {
+        case FIRST_TAG:
+          processHelper(
+              (V1 v1, V2 v2) -> {
+                context.output(KV.of(element.getKey(), Pair.of(v1, v2)));
+              },
+              element,
+              ts,
+              firstItems,
+              secondItems,
+              cleanupTimers,
+              firstCollectionValidFor,
+              secondCollectionValidFor);
+          break;
+        case SECOND_TAG:
+          processHelper(
+              (V2 v2, V1 v1) -> {
+                context.output(KV.of(element.getKey(), Pair.of(v1, v2)));
+              },
+              element,
+              ts,
+              secondItems,
+              firstItems,
+              cleanupTimers,
+              secondCollectionValidFor,
+              firstCollectionValidFor);
+      }
+    }
+
+    @OnTimerFamily("cleanupTimers")
+    public void onCleanupTimer(
+        @TimerId String timerId,
+        @StateId("v1Items") OrderedListState<V1> firstItems,
+        @StateId("v2Items") OrderedListState<V2> secondItems) {
+      Instant currentTime = Instant.ofEpochMilli(Long.valueOf(timerId)).minus(allowedLateness);
+      firstItems.clearRange(Instant.ofEpochMilli(0), currentTime.minus(firstCollectionValidFor));

Review comment:
       start at BoundedWindow.TIMESTAMP_MIN_VALUE
   
   

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/EventTimeEquiJoin.java
##########
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.join;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.state.OrderedListState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.TimerMap;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * Returns a {@link PTransform} that performs a {@link EventTimeEquiJoin} on two PCollections. A
+ * {@link EventTimeEquiJoin} joins elements with equal keys bounded by the difference in event time.
+ *
+ * <p>Example of performing a {@link EventTimeEquiJoin}:
+ *
+ * <pre>{@code
+ * PCollection<KV<K, V1>> pt1 = ...;
+ * PCollection<KV<K, V2>> pt2 = ...;
+ *
+ * PCollection<KV<K, Pair<V1, V2>> eventTimeEquiJoinCollection =
+ *   pt1.apply(EventTimeEquiJoin.<K, V1, V2>of(pt2));
+ *
+ * @param secondCollection the second collection to use in the join.
+ * @param <K> the type of the keys in the input {@code PCollection}s
+ * @param <V1> the type of the value in the first {@code PCollection}
+ * @param <V2> the type of the value in the second {@code PCollection}
+ * </pre>
+ */
+@AutoValue
+public abstract class EventTimeEquiJoin<K, V1, V2>

Review comment:
       Would like to think on the name a bit more, as EventTimeEquiJoin seems a bit awkward to me.

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/Pair.java
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.join;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaCreate;
+
+/** A pair of two different objects. */
+@DefaultSchema(AutoValueSchema.class)

Review comment:
       Given that schema inference will not currently infer V1, V2 properly, I'm not sure we should specify DefaultSchema here

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/EventTimeEquiJoin.java
##########
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.join;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.state.OrderedListState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.TimerMap;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * Returns a {@link PTransform} that performs a {@link EventTimeEquiJoin} on two PCollections. A
+ * {@link EventTimeEquiJoin} joins elements with equal keys bounded by the difference in event time.
+ *
+ * <p>Example of performing a {@link EventTimeEquiJoin}:
+ *
+ * <pre>{@code
+ * PCollection<KV<K, V1>> pt1 = ...;
+ * PCollection<KV<K, V2>> pt2 = ...;
+ *
+ * PCollection<KV<K, Pair<V1, V2>> eventTimeEquiJoinCollection =
+ *   pt1.apply(EventTimeEquiJoin.<K, V1, V2>of(pt2));
+ *
+ * @param secondCollection the second collection to use in the join.
+ * @param <K> the type of the keys in the input {@code PCollection}s
+ * @param <V1> the type of the value in the first {@code PCollection}
+ * @param <V2> the type of the value in the second {@code PCollection}
+ * </pre>
+ */
+@AutoValue
+public abstract class EventTimeEquiJoin<K, V1, V2>
+    extends PTransform<PCollection<KV<K, V1>>, PCollection<KV<K, Pair<V1, V2>>>> {
+  /** Returns a {@link PTransform} that performs a {@link EventTimeEquiJoin} on two PCollections. */
+  public static <K, V1, V2> EventTimeEquiJoin<K, V1, V2> of(
+      PCollection<KV<K, V2>> secondCollection) {
+    return new AutoValue_EventTimeEquiJoin.Builder<K, V1, V2>()
+        .setSecondCollection(secondCollection)
+        .setFirstCollectionValidFor(Duration.ZERO)
+        .setSecondCollectionValidFor(Duration.ZERO)
+        .setAllowedLateness(Duration.ZERO)
+        .build();
+  }
+
+  abstract PCollection<KV<K, V2>> getSecondCollection();
+
+  abstract Duration getFirstCollectionValidFor();
+
+  abstract Duration getSecondCollectionValidFor();
+
+  abstract Duration getAllowedLateness();
+
+  abstract Builder<K, V1, V2> toBuilder();
+
+  @AutoValue.Builder
+  public abstract static class Builder<K, V1, V2> {
+    public abstract Builder<K, V1, V2> setSecondCollection(PCollection<KV<K, V2>> value);
+
+    public abstract Builder<K, V1, V2> setFirstCollectionValidFor(Duration value);
+
+    public abstract Builder<K, V1, V2> setSecondCollectionValidFor(Duration value);
+
+    public abstract Builder<K, V1, V2> setAllowedLateness(Duration value);
+
+    abstract EventTimeEquiJoin<K, V1, V2> build();
+  }
+
+  /**
+   * Returns a {@code Impl<K, V1, V2>} {@code PTransform} that matches elements from the first and
+   * second collection with the same keys if their timestamps are within the given interval.
+   *
+   * @param interval the allowed difference between the timestamps to allow a match
+   */
+  public EventTimeEquiJoin<K, V1, V2> within(Duration interval) {
+    return toBuilder()
+        .setFirstCollectionValidFor(interval)
+        .setSecondCollectionValidFor(interval)
+        .build();
+  }
+
+  /**
+   * Returns a {@code Impl<K, V1, V2>} {@code PTransform} that matches elements from the first and
+   * second collection with the same keys if the collection's element comes within the valid time
+   * range for the other collection.
+   *
+   * @param firstCollectionValidFor the valid time range for the first collection
+   * @param secondCollectionValidFor the valid time range for the second collection
+   */
+  public EventTimeEquiJoin<K, V1, V2> within(
+      Duration firstCollectionValidFor, Duration secondCollectionValidFor) {
+    return toBuilder()
+        .setFirstCollectionValidFor(firstCollectionValidFor)
+        .setSecondCollectionValidFor(secondCollectionValidFor)
+        .build();
+  }
+
+  /**
+   * Returns a {@code Impl<K, V1, V2>} {@code PTransform} that matches elements from the first and
+   * second collection with the same keys and allows for late elements
+   *
+   * @param allowedLateness the amount of time late elements are allowed.
+   */
+  public EventTimeEquiJoin<K, V1, V2> withAllowedLateness(Duration allowedLateness) {
+    return toBuilder().setAllowedLateness(allowedLateness).build();
+  }
+
+  @Override
+  public PCollection<KV<K, Pair<V1, V2>>> expand(PCollection<KV<K, V1>> input) {
+    Coder<K> keyCoder = JoinUtils.getKeyCoder(input);
+    Coder<V1> firstValueCoder = JoinUtils.getValueCoder(input);
+    Coder<V2> secondValueCoder = JoinUtils.getValueCoder(getSecondCollection());
+    UnionCoder unionCoder = UnionCoder.of(ImmutableList.of(firstValueCoder, secondValueCoder));
+    KvCoder<K, RawUnionValue> kvCoder = KvCoder.of(JoinUtils.getKeyCoder(input), unionCoder);
+    PCollectionList<KV<K, RawUnionValue>> union =
+        PCollectionList.of(JoinUtils.makeUnionTable(0, input, kvCoder))
+            .and(JoinUtils.makeUnionTable(1, getSecondCollection(), kvCoder));
+    return union
+        .apply("Flatten", Flatten.pCollections())
+        .apply(
+            "Join",
+            ParDo.of(
+                new EventTimeEquiJoinDoFn<>(
+                    firstValueCoder,
+                    secondValueCoder,
+                    getFirstCollectionValidFor(),
+                    getSecondCollectionValidFor(),
+                    getAllowedLateness())))
+        .setCoder(KvCoder.of(keyCoder, PairCoder.<V1, V2>of(firstValueCoder, secondValueCoder)));
+  }
+
+  private static class EventTimeEquiJoinDoFn<K, V1, V2>
+      extends DoFn<KV<K, RawUnionValue>, KV<K, Pair<V1, V2>>> {
+    private static final int FIRST_TAG = 0;
+    private static final int SECOND_TAG = 1;
+
+    // Bucket cleanup timers into TIMER_BUCKET length buckets.
+    private static final long TIMER_BUCKET = Duration.standardMinutes(1).getMillis();
+
+    // How long elements in the first and second collection are valid (can be matched) for.
+    private final Duration firstCollectionValidFor;
+    private final Duration secondCollectionValidFor;
+
+    // How long past the watermark that late elements can show up.
+    private final Duration allowedLateness;
+
+    @StateId("v1Items")
+    private final StateSpec<OrderedListState<V1>> firstCollectionItems;
+
+    @StateId("v2Items")
+    private final StateSpec<OrderedListState<V2>> secondCollectionItems;
+
+    @TimerFamily("cleanupTimers")
+    private final TimerSpec cleanupTimers = TimerSpecs.timerMap(TimeDomain.EVENT_TIME);
+
+    public EventTimeEquiJoinDoFn(
+        Coder<V1> firstValueCoder,
+        Coder<V2> secondValueCoder,
+        Duration firstValidFor,
+        Duration secondValidFor,
+        Duration allowedLateness) {
+      this.firstCollectionValidFor = firstValidFor;
+      this.secondCollectionValidFor = secondValidFor;
+      this.allowedLateness = allowedLateness;
+      firstCollectionItems = StateSpecs.orderedList(firstValueCoder);
+      secondCollectionItems = StateSpecs.orderedList(secondValueCoder);
+    }
+
+    @FunctionalInterface
+    private interface Output<T1, T2> {
+      void apply(T1 one, T2 two);
+    }
+
+    private <T, O> void processHelper(
+        Output<T, O> output,
+        KV<K, RawUnionValue> element,
+        Instant ts,
+        OrderedListState<T> thisCollection,
+        OrderedListState<O> otherCollection,
+        TimerMap cleanupTimers,
+        Duration thisCollectionValidFor,
+        Duration otherCollectionValidFor) {
+      thisCollection.add(TimestampedValue.of((T) element.getValue().getValue(), ts));
+      Instant beginning = ts.minus(otherCollectionValidFor);
+      Instant end = ts.plus(thisCollectionValidFor).plus(1L);
+      for (TimestampedValue<O> value : otherCollection.readRange(beginning, end)) {
+        output.apply((T) element.getValue().getValue(), value.getValue());
+      }
+      Instant cleanupTime = ts.plus(allowedLateness).plus(thisCollectionValidFor);
+      Instant nextBucketStart =
+          Instant.ofEpochMilli(
+              cleanupTime.getMillis() / TIMER_BUCKET * TIMER_BUCKET + TIMER_BUCKET);
+      cleanupTimers.get(Long.toString(nextBucketStart.getMillis())).set(nextBucketStart);
+    }
+
+    @ProcessElement
+    public void process(
+        ProcessContext context,
+        @Element KV<K, RawUnionValue> element,
+        @Timestamp Instant ts,
+        @StateId("v1Items") OrderedListState<V1> firstItems,
+        @StateId("v2Items") OrderedListState<V2> secondItems,
+        @TimerFamily("cleanupTimers") TimerMap cleanupTimers) {
+      switch (element.getValue().getUnionTag()) {
+        case FIRST_TAG:
+          processHelper(
+              (V1 v1, V2 v2) -> {
+                context.output(KV.of(element.getKey(), Pair.of(v1, v2)));
+              },
+              element,
+              ts,
+              firstItems,
+              secondItems,
+              cleanupTimers,
+              firstCollectionValidFor,
+              secondCollectionValidFor);
+          break;
+        case SECOND_TAG:
+          processHelper(
+              (V2 v2, V1 v1) -> {
+                context.output(KV.of(element.getKey(), Pair.of(v1, v2)));
+              },
+              element,
+              ts,
+              secondItems,
+              firstItems,
+              cleanupTimers,
+              secondCollectionValidFor,
+              firstCollectionValidFor);

Review comment:
       default:

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/EventTimeEquiJoin.java
##########
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.join;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.state.OrderedListState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.TimerMap;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * Returns a {@link PTransform} that performs a {@link EventTimeEquiJoin} on two PCollections. A
+ * {@link EventTimeEquiJoin} joins elements with equal keys bounded by the difference in event time.
+ *
+ * <p>Example of performing a {@link EventTimeEquiJoin}:
+ *
+ * <pre>{@code
+ * PCollection<KV<K, V1>> pt1 = ...;
+ * PCollection<KV<K, V2>> pt2 = ...;
+ *
+ * PCollection<KV<K, Pair<V1, V2>> eventTimeEquiJoinCollection =
+ *   pt1.apply(EventTimeEquiJoin.<K, V1, V2>of(pt2));
+ *
+ * @param secondCollection the second collection to use in the join.
+ * @param <K> the type of the keys in the input {@code PCollection}s
+ * @param <V1> the type of the value in the first {@code PCollection}
+ * @param <V2> the type of the value in the second {@code PCollection}
+ * </pre>
+ */
+@AutoValue
+public abstract class EventTimeEquiJoin<K, V1, V2>

Review comment:
       I also wonder about whether this PTransform should be in terms of KVs or not. However I'm starting to think that it should be and then we have the option to wrap it in a higher-level Join transform

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/EventTimeEquiJoin.java
##########
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.join;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.state.OrderedListState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.TimerMap;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * Returns a {@link PTransform} that performs a {@link EventTimeEquiJoin} on two PCollections. A
+ * {@link EventTimeEquiJoin} joins elements with equal keys bounded by the difference in event time.
+ *
+ * <p>Example of performing a {@link EventTimeEquiJoin}:
+ *
+ * <pre>{@code
+ * PCollection<KV<K, V1>> pt1 = ...;
+ * PCollection<KV<K, V2>> pt2 = ...;
+ *
+ * PCollection<KV<K, Pair<V1, V2>> eventTimeEquiJoinCollection =
+ *   pt1.apply(EventTimeEquiJoin.<K, V1, V2>of(pt2));
+ *
+ * @param secondCollection the second collection to use in the join.
+ * @param <K> the type of the keys in the input {@code PCollection}s
+ * @param <V1> the type of the value in the first {@code PCollection}
+ * @param <V2> the type of the value in the second {@code PCollection}
+ * </pre>
+ */
+@AutoValue
+public abstract class EventTimeEquiJoin<K, V1, V2>
+    extends PTransform<PCollection<KV<K, V1>>, PCollection<KV<K, Pair<V1, V2>>>> {
+  /** Returns a {@link PTransform} that performs a {@link EventTimeEquiJoin} on two PCollections. */
+  public static <K, V1, V2> EventTimeEquiJoin<K, V1, V2> of(
+      PCollection<KV<K, V2>> secondCollection) {
+    return new AutoValue_EventTimeEquiJoin.Builder<K, V1, V2>()
+        .setSecondCollection(secondCollection)
+        .setFirstCollectionValidFor(Duration.ZERO)
+        .setSecondCollectionValidFor(Duration.ZERO)
+        .setAllowedLateness(Duration.ZERO)
+        .build();
+  }
+
+  abstract PCollection<KV<K, V2>> getSecondCollection();
+
+  abstract Duration getFirstCollectionValidFor();
+
+  abstract Duration getSecondCollectionValidFor();
+
+  abstract Duration getAllowedLateness();
+
+  abstract Builder<K, V1, V2> toBuilder();
+
+  @AutoValue.Builder
+  public abstract static class Builder<K, V1, V2> {
+    public abstract Builder<K, V1, V2> setSecondCollection(PCollection<KV<K, V2>> value);
+
+    public abstract Builder<K, V1, V2> setFirstCollectionValidFor(Duration value);
+
+    public abstract Builder<K, V1, V2> setSecondCollectionValidFor(Duration value);
+
+    public abstract Builder<K, V1, V2> setAllowedLateness(Duration value);
+
+    abstract EventTimeEquiJoin<K, V1, V2> build();
+  }
+
+  /**
+   * Returns a {@code Impl<K, V1, V2>} {@code PTransform} that matches elements from the first and
+   * second collection with the same keys if their timestamps are within the given interval.
+   *
+   * @param interval the allowed difference between the timestamps to allow a match
+   */
+  public EventTimeEquiJoin<K, V1, V2> within(Duration interval) {
+    return toBuilder()
+        .setFirstCollectionValidFor(interval)
+        .setSecondCollectionValidFor(interval)
+        .build();
+  }
+
+  /**
+   * Returns a {@code Impl<K, V1, V2>} {@code PTransform} that matches elements from the first and
+   * second collection with the same keys if the collection's element comes within the valid time
+   * range for the other collection.
+   *
+   * @param firstCollectionValidFor the valid time range for the first collection
+   * @param secondCollectionValidFor the valid time range for the second collection
+   */
+  public EventTimeEquiJoin<K, V1, V2> within(
+      Duration firstCollectionValidFor, Duration secondCollectionValidFor) {
+    return toBuilder()
+        .setFirstCollectionValidFor(firstCollectionValidFor)
+        .setSecondCollectionValidFor(secondCollectionValidFor)
+        .build();
+  }
+
+  /**
+   * Returns a {@code Impl<K, V1, V2>} {@code PTransform} that matches elements from the first and
+   * second collection with the same keys and allows for late elements
+   *
+   * @param allowedLateness the amount of time late elements are allowed.
+   */
+  public EventTimeEquiJoin<K, V1, V2> withAllowedLateness(Duration allowedLateness) {
+    return toBuilder().setAllowedLateness(allowedLateness).build();
+  }
+
+  @Override
+  public PCollection<KV<K, Pair<V1, V2>>> expand(PCollection<KV<K, V1>> input) {
+    Coder<K> keyCoder = JoinUtils.getKeyCoder(input);
+    Coder<V1> firstValueCoder = JoinUtils.getValueCoder(input);
+    Coder<V2> secondValueCoder = JoinUtils.getValueCoder(getSecondCollection());
+    UnionCoder unionCoder = UnionCoder.of(ImmutableList.of(firstValueCoder, secondValueCoder));
+    KvCoder<K, RawUnionValue> kvCoder = KvCoder.of(JoinUtils.getKeyCoder(input), unionCoder);
+    PCollectionList<KV<K, RawUnionValue>> union =
+        PCollectionList.of(JoinUtils.makeUnionTable(0, input, kvCoder))
+            .and(JoinUtils.makeUnionTable(1, getSecondCollection(), kvCoder));
+    return union
+        .apply("Flatten", Flatten.pCollections())
+        .apply(
+            "Join",
+            ParDo.of(
+                new EventTimeEquiJoinDoFn<>(
+                    firstValueCoder,
+                    secondValueCoder,
+                    getFirstCollectionValidFor(),
+                    getSecondCollectionValidFor(),
+                    getAllowedLateness())))
+        .setCoder(KvCoder.of(keyCoder, PairCoder.<V1, V2>of(firstValueCoder, secondValueCoder)));
+  }
+
+  private static class EventTimeEquiJoinDoFn<K, V1, V2>
+      extends DoFn<KV<K, RawUnionValue>, KV<K, Pair<V1, V2>>> {
+    private static final int FIRST_TAG = 0;
+    private static final int SECOND_TAG = 1;
+
+    // Bucket cleanup timers into TIMER_BUCKET length buckets.
+    private static final long TIMER_BUCKET = Duration.standardMinutes(1).getMillis();
+
+    // How long elements in the first and second collection are valid (can be matched) for.
+    private final Duration firstCollectionValidFor;
+    private final Duration secondCollectionValidFor;
+
+    // How long past the watermark that late elements can show up.
+    private final Duration allowedLateness;
+
+    @StateId("v1Items")
+    private final StateSpec<OrderedListState<V1>> firstCollectionItems;
+
+    @StateId("v2Items")
+    private final StateSpec<OrderedListState<V2>> secondCollectionItems;
+
+    @TimerFamily("cleanupTimers")
+    private final TimerSpec cleanupTimers = TimerSpecs.timerMap(TimeDomain.EVENT_TIME);
+
+    public EventTimeEquiJoinDoFn(
+        Coder<V1> firstValueCoder,
+        Coder<V2> secondValueCoder,
+        Duration firstValidFor,
+        Duration secondValidFor,
+        Duration allowedLateness) {
+      this.firstCollectionValidFor = firstValidFor;
+      this.secondCollectionValidFor = secondValidFor;
+      this.allowedLateness = allowedLateness;
+      firstCollectionItems = StateSpecs.orderedList(firstValueCoder);
+      secondCollectionItems = StateSpecs.orderedList(secondValueCoder);
+    }
+
+    @FunctionalInterface
+    private interface Output<T1, T2> {
+      void apply(T1 one, T2 two);
+    }
+
+    private <T, O> void processHelper(
+        Output<T, O> output,
+        KV<K, RawUnionValue> element,
+        Instant ts,
+        OrderedListState<T> thisCollection,
+        OrderedListState<O> otherCollection,
+        TimerMap cleanupTimers,
+        Duration thisCollectionValidFor,
+        Duration otherCollectionValidFor) {
+      thisCollection.add(TimestampedValue.of((T) element.getValue().getValue(), ts));
+      Instant beginning = ts.minus(otherCollectionValidFor);
+      Instant end = ts.plus(thisCollectionValidFor).plus(1L);
+      for (TimestampedValue<O> value : otherCollection.readRange(beginning, end)) {
+        output.apply((T) element.getValue().getValue(), value.getValue());
+      }
+      Instant cleanupTime = ts.plus(allowedLateness).plus(thisCollectionValidFor);
+      Instant nextBucketStart =
+          Instant.ofEpochMilli(
+              cleanupTime.getMillis() / TIMER_BUCKET * TIMER_BUCKET + TIMER_BUCKET);
+      cleanupTimers.get(Long.toString(nextBucketStart.getMillis())).set(nextBucketStart);
+    }
+
+    @ProcessElement
+    public void process(
+        ProcessContext context,
+        @Element KV<K, RawUnionValue> element,
+        @Timestamp Instant ts,
+        @StateId("v1Items") OrderedListState<V1> firstItems,
+        @StateId("v2Items") OrderedListState<V2> secondItems,
+        @TimerFamily("cleanupTimers") TimerMap cleanupTimers) {
+      switch (element.getValue().getUnionTag()) {
+        case FIRST_TAG:
+          processHelper(
+              (V1 v1, V2 v2) -> {
+                context.output(KV.of(element.getKey(), Pair.of(v1, v2)));
+              },
+              element,
+              ts,
+              firstItems,
+              secondItems,
+              cleanupTimers,
+              firstCollectionValidFor,
+              secondCollectionValidFor);
+          break;
+        case SECOND_TAG:
+          processHelper(
+              (V2 v2, V1 v1) -> {
+                context.output(KV.of(element.getKey(), Pair.of(v1, v2)));
+              },
+              element,
+              ts,
+              secondItems,
+              firstItems,
+              cleanupTimers,
+              secondCollectionValidFor,
+              firstCollectionValidFor);
+      }
+    }
+
+    @OnTimerFamily("cleanupTimers")
+    public void onCleanupTimer(
+        @TimerId String timerId,
+        @StateId("v1Items") OrderedListState<V1> firstItems,
+        @StateId("v2Items") OrderedListState<V2> secondItems) {
+      Instant currentTime = Instant.ofEpochMilli(Long.valueOf(timerId)).minus(allowedLateness);
+      firstItems.clearRange(Instant.ofEpochMilli(0), currentTime.minus(firstCollectionValidFor));
+      secondItems.clearRange(Instant.ofEpochMilli(0), currentTime.minus(secondCollectionValidFor));

Review comment:
       ditto

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/EventTimeEquiJoin.java
##########
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.join;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.state.OrderedListState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.TimerMap;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * Returns a {@link PTransform} that performs a {@link EventTimeEquiJoin} on two PCollections. A
+ * {@link EventTimeEquiJoin} joins elements with equal keys bounded by the difference in event time.
+ *
+ * <p>Example of performing a {@link EventTimeEquiJoin}:
+ *
+ * <pre>{@code
+ * PCollection<KV<K, V1>> pt1 = ...;
+ * PCollection<KV<K, V2>> pt2 = ...;
+ *
+ * PCollection<KV<K, Pair<V1, V2>> eventTimeEquiJoinCollection =
+ *   pt1.apply(EventTimeEquiJoin.<K, V1, V2>of(pt2));
+ *
+ * @param secondCollection the second collection to use in the join.
+ * @param <K> the type of the keys in the input {@code PCollection}s
+ * @param <V1> the type of the value in the first {@code PCollection}
+ * @param <V2> the type of the value in the second {@code PCollection}
+ * </pre>
+ */
+@AutoValue
+public abstract class EventTimeEquiJoin<K, V1, V2>
+    extends PTransform<PCollection<KV<K, V1>>, PCollection<KV<K, Pair<V1, V2>>>> {
+  /** Returns a {@link PTransform} that performs a {@link EventTimeEquiJoin} on two PCollections. */
+  public static <K, V1, V2> EventTimeEquiJoin<K, V1, V2> of(
+      PCollection<KV<K, V2>> secondCollection) {
+    return new AutoValue_EventTimeEquiJoin.Builder<K, V1, V2>()
+        .setSecondCollection(secondCollection)
+        .setFirstCollectionValidFor(Duration.ZERO)
+        .setSecondCollectionValidFor(Duration.ZERO)
+        .setAllowedLateness(Duration.ZERO)
+        .build();
+  }
+
+  abstract PCollection<KV<K, V2>> getSecondCollection();
+
+  abstract Duration getFirstCollectionValidFor();
+
+  abstract Duration getSecondCollectionValidFor();
+
+  abstract Duration getAllowedLateness();
+
+  abstract Builder<K, V1, V2> toBuilder();
+
+  @AutoValue.Builder
+  public abstract static class Builder<K, V1, V2> {
+    public abstract Builder<K, V1, V2> setSecondCollection(PCollection<KV<K, V2>> value);
+
+    public abstract Builder<K, V1, V2> setFirstCollectionValidFor(Duration value);
+
+    public abstract Builder<K, V1, V2> setSecondCollectionValidFor(Duration value);
+
+    public abstract Builder<K, V1, V2> setAllowedLateness(Duration value);
+
+    abstract EventTimeEquiJoin<K, V1, V2> build();
+  }
+
+  /**
+   * Returns a {@code Impl<K, V1, V2>} {@code PTransform} that matches elements from the first and
+   * second collection with the same keys if their timestamps are within the given interval.
+   *
+   * @param interval the allowed difference between the timestamps to allow a match
+   */
+  public EventTimeEquiJoin<K, V1, V2> within(Duration interval) {
+    return toBuilder()
+        .setFirstCollectionValidFor(interval)
+        .setSecondCollectionValidFor(interval)
+        .build();
+  }
+
+  /**
+   * Returns a {@code Impl<K, V1, V2>} {@code PTransform} that matches elements from the first and
+   * second collection with the same keys if the collection's element comes within the valid time
+   * range for the other collection.
+   *
+   * @param firstCollectionValidFor the valid time range for the first collection
+   * @param secondCollectionValidFor the valid time range for the second collection
+   */
+  public EventTimeEquiJoin<K, V1, V2> within(
+      Duration firstCollectionValidFor, Duration secondCollectionValidFor) {
+    return toBuilder()
+        .setFirstCollectionValidFor(firstCollectionValidFor)
+        .setSecondCollectionValidFor(secondCollectionValidFor)
+        .build();
+  }
+
+  /**
+   * Returns a {@code Impl<K, V1, V2>} {@code PTransform} that matches elements from the first and
+   * second collection with the same keys and allows for late elements
+   *
+   * @param allowedLateness the amount of time late elements are allowed.
+   */
+  public EventTimeEquiJoin<K, V1, V2> withAllowedLateness(Duration allowedLateness) {
+    return toBuilder().setAllowedLateness(allowedLateness).build();
+  }
+
+  @Override
+  public PCollection<KV<K, Pair<V1, V2>>> expand(PCollection<KV<K, V1>> input) {
+    Coder<K> keyCoder = JoinUtils.getKeyCoder(input);
+    Coder<V1> firstValueCoder = JoinUtils.getValueCoder(input);
+    Coder<V2> secondValueCoder = JoinUtils.getValueCoder(getSecondCollection());
+    UnionCoder unionCoder = UnionCoder.of(ImmutableList.of(firstValueCoder, secondValueCoder));
+    KvCoder<K, RawUnionValue> kvCoder = KvCoder.of(JoinUtils.getKeyCoder(input), unionCoder);
+    PCollectionList<KV<K, RawUnionValue>> union =
+        PCollectionList.of(JoinUtils.makeUnionTable(0, input, kvCoder))
+            .and(JoinUtils.makeUnionTable(1, getSecondCollection(), kvCoder));
+    return union
+        .apply("Flatten", Flatten.pCollections())
+        .apply(
+            "Join",
+            ParDo.of(
+                new EventTimeEquiJoinDoFn<>(
+                    firstValueCoder,
+                    secondValueCoder,
+                    getFirstCollectionValidFor(),
+                    getSecondCollectionValidFor(),
+                    getAllowedLateness())))
+        .setCoder(KvCoder.of(keyCoder, PairCoder.<V1, V2>of(firstValueCoder, secondValueCoder)));
+  }
+
+  private static class EventTimeEquiJoinDoFn<K, V1, V2>
+      extends DoFn<KV<K, RawUnionValue>, KV<K, Pair<V1, V2>>> {
+    private static final int FIRST_TAG = 0;
+    private static final int SECOND_TAG = 1;
+
+    // Bucket cleanup timers into TIMER_BUCKET length buckets.
+    private static final long TIMER_BUCKET = Duration.standardMinutes(1).getMillis();
+
+    // How long elements in the first and second collection are valid (can be matched) for.
+    private final Duration firstCollectionValidFor;
+    private final Duration secondCollectionValidFor;
+
+    // How long past the watermark that late elements can show up.
+    private final Duration allowedLateness;
+
+    @StateId("v1Items")
+    private final StateSpec<OrderedListState<V1>> firstCollectionItems;
+
+    @StateId("v2Items")
+    private final StateSpec<OrderedListState<V2>> secondCollectionItems;
+
+    @TimerFamily("cleanupTimers")
+    private final TimerSpec cleanupTimers = TimerSpecs.timerMap(TimeDomain.EVENT_TIME);
+
+    public EventTimeEquiJoinDoFn(
+        Coder<V1> firstValueCoder,
+        Coder<V2> secondValueCoder,
+        Duration firstValidFor,
+        Duration secondValidFor,
+        Duration allowedLateness) {
+      this.firstCollectionValidFor = firstValidFor;
+      this.secondCollectionValidFor = secondValidFor;
+      this.allowedLateness = allowedLateness;
+      firstCollectionItems = StateSpecs.orderedList(firstValueCoder);
+      secondCollectionItems = StateSpecs.orderedList(secondValueCoder);
+    }
+
+    @FunctionalInterface
+    private interface Output<T1, T2> {
+      void apply(T1 one, T2 two);
+    }
+
+    private <T, O> void processHelper(
+        Output<T, O> output,
+        KV<K, RawUnionValue> element,
+        Instant ts,
+        OrderedListState<T> thisCollection,
+        OrderedListState<O> otherCollection,
+        TimerMap cleanupTimers,
+        Duration thisCollectionValidFor,
+        Duration otherCollectionValidFor) {
+      thisCollection.add(TimestampedValue.of((T) element.getValue().getValue(), ts));
+      Instant beginning = ts.minus(otherCollectionValidFor);
+      Instant end = ts.plus(thisCollectionValidFor).plus(1L);
+      for (TimestampedValue<O> value : otherCollection.readRange(beginning, end)) {
+        output.apply((T) element.getValue().getValue(), value.getValue());
+      }
+      Instant cleanupTime = ts.plus(allowedLateness).plus(thisCollectionValidFor);
+      Instant nextBucketStart =
+          Instant.ofEpochMilli(
+              cleanupTime.getMillis() / TIMER_BUCKET * TIMER_BUCKET + TIMER_BUCKET);
+      cleanupTimers.get(Long.toString(nextBucketStart.getMillis())).set(nextBucketStart);
+    }
+
+    @ProcessElement
+    public void process(
+        ProcessContext context,

Review comment:
       no need for ProcessCountext. Just add an OutputReceiver parameter instead

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/EventTimeEquiJoin.java
##########
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.join;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.state.OrderedListState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.TimerMap;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * Returns a {@link PTransform} that performs a {@link EventTimeEquiJoin} on two PCollections. A
+ * {@link EventTimeEquiJoin} joins elements with equal keys bounded by the difference in event time.
+ *
+ * <p>Example of performing a {@link EventTimeEquiJoin}:
+ *
+ * <pre>{@code
+ * PCollection<KV<K, V1>> pt1 = ...;
+ * PCollection<KV<K, V2>> pt2 = ...;
+ *
+ * PCollection<KV<K, Pair<V1, V2>> eventTimeEquiJoinCollection =
+ *   pt1.apply(EventTimeEquiJoin.<K, V1, V2>of(pt2));
+ *
+ * @param secondCollection the second collection to use in the join.
+ * @param <K> the type of the keys in the input {@code PCollection}s
+ * @param <V1> the type of the value in the first {@code PCollection}
+ * @param <V2> the type of the value in the second {@code PCollection}
+ * </pre>
+ */
+@AutoValue
+public abstract class EventTimeEquiJoin<K, V1, V2>
+    extends PTransform<PCollection<KV<K, V1>>, PCollection<KV<K, Pair<V1, V2>>>> {
+  /** Returns a {@link PTransform} that performs a {@link EventTimeEquiJoin} on two PCollections. */
+  public static <K, V1, V2> EventTimeEquiJoin<K, V1, V2> of(
+      PCollection<KV<K, V2>> secondCollection) {
+    return new AutoValue_EventTimeEquiJoin.Builder<K, V1, V2>()
+        .setSecondCollection(secondCollection)
+        .setFirstCollectionValidFor(Duration.ZERO)
+        .setSecondCollectionValidFor(Duration.ZERO)
+        .setAllowedLateness(Duration.ZERO)
+        .build();
+  }
+
+  abstract PCollection<KV<K, V2>> getSecondCollection();
+
+  abstract Duration getFirstCollectionValidFor();
+
+  abstract Duration getSecondCollectionValidFor();
+
+  abstract Duration getAllowedLateness();
+
+  abstract Builder<K, V1, V2> toBuilder();
+
+  @AutoValue.Builder
+  public abstract static class Builder<K, V1, V2> {
+    public abstract Builder<K, V1, V2> setSecondCollection(PCollection<KV<K, V2>> value);
+
+    public abstract Builder<K, V1, V2> setFirstCollectionValidFor(Duration value);
+
+    public abstract Builder<K, V1, V2> setSecondCollectionValidFor(Duration value);
+
+    public abstract Builder<K, V1, V2> setAllowedLateness(Duration value);
+
+    abstract EventTimeEquiJoin<K, V1, V2> build();
+  }
+
+  /**
+   * Returns a {@code Impl<K, V1, V2>} {@code PTransform} that matches elements from the first and
+   * second collection with the same keys if their timestamps are within the given interval.
+   *
+   * @param interval the allowed difference between the timestamps to allow a match
+   */
+  public EventTimeEquiJoin<K, V1, V2> within(Duration interval) {
+    return toBuilder()
+        .setFirstCollectionValidFor(interval)
+        .setSecondCollectionValidFor(interval)
+        .build();
+  }
+
+  /**
+   * Returns a {@code Impl<K, V1, V2>} {@code PTransform} that matches elements from the first and
+   * second collection with the same keys if the collection's element comes within the valid time
+   * range for the other collection.
+   *
+   * @param firstCollectionValidFor the valid time range for the first collection
+   * @param secondCollectionValidFor the valid time range for the second collection
+   */
+  public EventTimeEquiJoin<K, V1, V2> within(
+      Duration firstCollectionValidFor, Duration secondCollectionValidFor) {
+    return toBuilder()
+        .setFirstCollectionValidFor(firstCollectionValidFor)
+        .setSecondCollectionValidFor(secondCollectionValidFor)
+        .build();
+  }
+
+  /**
+   * Returns a {@code Impl<K, V1, V2>} {@code PTransform} that matches elements from the first and
+   * second collection with the same keys and allows for late elements
+   *
+   * @param allowedLateness the amount of time late elements are allowed.
+   */
+  public EventTimeEquiJoin<K, V1, V2> withAllowedLateness(Duration allowedLateness) {
+    return toBuilder().setAllowedLateness(allowedLateness).build();
+  }
+
+  @Override
+  public PCollection<KV<K, Pair<V1, V2>>> expand(PCollection<KV<K, V1>> input) {
+    Coder<K> keyCoder = JoinUtils.getKeyCoder(input);
+    Coder<V1> firstValueCoder = JoinUtils.getValueCoder(input);
+    Coder<V2> secondValueCoder = JoinUtils.getValueCoder(getSecondCollection());
+    UnionCoder unionCoder = UnionCoder.of(ImmutableList.of(firstValueCoder, secondValueCoder));
+    KvCoder<K, RawUnionValue> kvCoder = KvCoder.of(JoinUtils.getKeyCoder(input), unionCoder);
+    PCollectionList<KV<K, RawUnionValue>> union =
+        PCollectionList.of(JoinUtils.makeUnionTable(0, input, kvCoder))
+            .and(JoinUtils.makeUnionTable(1, getSecondCollection(), kvCoder));
+    return union
+        .apply("Flatten", Flatten.pCollections())
+        .apply(
+            "Join",
+            ParDo.of(
+                new EventTimeEquiJoinDoFn<>(
+                    firstValueCoder,
+                    secondValueCoder,
+                    getFirstCollectionValidFor(),
+                    getSecondCollectionValidFor(),
+                    getAllowedLateness())))
+        .setCoder(KvCoder.of(keyCoder, PairCoder.<V1, V2>of(firstValueCoder, secondValueCoder)));
+  }
+
+  private static class EventTimeEquiJoinDoFn<K, V1, V2>
+      extends DoFn<KV<K, RawUnionValue>, KV<K, Pair<V1, V2>>> {
+    private static final int FIRST_TAG = 0;
+    private static final int SECOND_TAG = 1;
+
+    // Bucket cleanup timers into TIMER_BUCKET length buckets.
+    private static final long TIMER_BUCKET = Duration.standardMinutes(1).getMillis();
+
+    // How long elements in the first and second collection are valid (can be matched) for.
+    private final Duration firstCollectionValidFor;
+    private final Duration secondCollectionValidFor;
+
+    // How long past the watermark that late elements can show up.
+    private final Duration allowedLateness;
+
+    @StateId("v1Items")
+    private final StateSpec<OrderedListState<V1>> firstCollectionItems;
+
+    @StateId("v2Items")
+    private final StateSpec<OrderedListState<V2>> secondCollectionItems;
+
+    @TimerFamily("cleanupTimers")
+    private final TimerSpec cleanupTimers = TimerSpecs.timerMap(TimeDomain.EVENT_TIME);
+
+    public EventTimeEquiJoinDoFn(
+        Coder<V1> firstValueCoder,
+        Coder<V2> secondValueCoder,
+        Duration firstValidFor,
+        Duration secondValidFor,
+        Duration allowedLateness) {
+      this.firstCollectionValidFor = firstValidFor;
+      this.secondCollectionValidFor = secondValidFor;
+      this.allowedLateness = allowedLateness;
+      firstCollectionItems = StateSpecs.orderedList(firstValueCoder);
+      secondCollectionItems = StateSpecs.orderedList(secondValueCoder);
+    }
+
+    @FunctionalInterface
+    private interface Output<T1, T2> {
+      void apply(T1 one, T2 two);
+    }
+
+    private <T, O> void processHelper(
+        Output<T, O> output,
+        KV<K, RawUnionValue> element,
+        Instant ts,
+        OrderedListState<T> thisCollection,
+        OrderedListState<O> otherCollection,
+        TimerMap cleanupTimers,
+        Duration thisCollectionValidFor,
+        Duration otherCollectionValidFor) {
+      thisCollection.add(TimestampedValue.of((T) element.getValue().getValue(), ts));
+      Instant beginning = ts.minus(otherCollectionValidFor);
+      Instant end = ts.plus(thisCollectionValidFor).plus(1L);
+      for (TimestampedValue<O> value : otherCollection.readRange(beginning, end)) {
+        output.apply((T) element.getValue().getValue(), value.getValue());
+      }
+      Instant cleanupTime = ts.plus(allowedLateness).plus(thisCollectionValidFor);
+      Instant nextBucketStart =
+          Instant.ofEpochMilli(
+              cleanupTime.getMillis() / TIMER_BUCKET * TIMER_BUCKET + TIMER_BUCKET);
+      cleanupTimers.get(Long.toString(nextBucketStart.getMillis())).set(nextBucketStart);
+    }
+
+    @ProcessElement
+    public void process(
+        ProcessContext context,
+        @Element KV<K, RawUnionValue> element,
+        @Timestamp Instant ts,
+        @StateId("v1Items") OrderedListState<V1> firstItems,
+        @StateId("v2Items") OrderedListState<V2> secondItems,
+        @TimerFamily("cleanupTimers") TimerMap cleanupTimers) {
+      switch (element.getValue().getUnionTag()) {
+        case FIRST_TAG:
+          processHelper(
+              (V1 v1, V2 v2) -> {
+                context.output(KV.of(element.getKey(), Pair.of(v1, v2)));
+              },
+              element,
+              ts,
+              firstItems,
+              secondItems,
+              cleanupTimers,
+              firstCollectionValidFor,
+              secondCollectionValidFor);
+          break;
+        case SECOND_TAG:
+          processHelper(
+              (V2 v2, V1 v1) -> {
+                context.output(KV.of(element.getKey(), Pair.of(v1, v2)));
+              },
+              element,
+              ts,
+              secondItems,
+              firstItems,
+              cleanupTimers,
+              secondCollectionValidFor,
+              firstCollectionValidFor);
+      }
+    }
+
+    @OnTimerFamily("cleanupTimers")
+    public void onCleanupTimer(
+        @TimerId String timerId,

Review comment:
       You could add a @Timestamp parameter, you don't need to parse it out of the string timerId

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/EventTimeEquiJoin.java
##########
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.join;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.state.OrderedListState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.TimerMap;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * Returns a {@link PTransform} that performs a {@link EventTimeEquiJoin} on two PCollections. A
+ * {@link EventTimeEquiJoin} joins elements with equal keys bounded by the difference in event time.
+ *
+ * <p>Example of performing a {@link EventTimeEquiJoin}:
+ *
+ * <pre>{@code
+ * PCollection<KV<K, V1>> pt1 = ...;
+ * PCollection<KV<K, V2>> pt2 = ...;
+ *
+ * PCollection<KV<K, Pair<V1, V2>> eventTimeEquiJoinCollection =
+ *   pt1.apply(EventTimeEquiJoin.<K, V1, V2>of(pt2));
+ *
+ * @param secondCollection the second collection to use in the join.
+ * @param <K> the type of the keys in the input {@code PCollection}s
+ * @param <V1> the type of the value in the first {@code PCollection}
+ * @param <V2> the type of the value in the second {@code PCollection}
+ * </pre>
+ */
+@AutoValue
+public abstract class EventTimeEquiJoin<K, V1, V2>
+    extends PTransform<PCollection<KV<K, V1>>, PCollection<KV<K, Pair<V1, V2>>>> {
+  /** Returns a {@link PTransform} that performs a {@link EventTimeEquiJoin} on two PCollections. */
+  public static <K, V1, V2> EventTimeEquiJoin<K, V1, V2> of(
+      PCollection<KV<K, V2>> secondCollection) {
+    return new AutoValue_EventTimeEquiJoin.Builder<K, V1, V2>()
+        .setSecondCollection(secondCollection)
+        .setFirstCollectionValidFor(Duration.ZERO)
+        .setSecondCollectionValidFor(Duration.ZERO)
+        .setAllowedLateness(Duration.ZERO)
+        .build();
+  }
+
+  abstract PCollection<KV<K, V2>> getSecondCollection();
+
+  abstract Duration getFirstCollectionValidFor();
+
+  abstract Duration getSecondCollectionValidFor();
+
+  abstract Duration getAllowedLateness();
+
+  abstract Builder<K, V1, V2> toBuilder();
+
+  @AutoValue.Builder
+  public abstract static class Builder<K, V1, V2> {
+    public abstract Builder<K, V1, V2> setSecondCollection(PCollection<KV<K, V2>> value);
+
+    public abstract Builder<K, V1, V2> setFirstCollectionValidFor(Duration value);
+
+    public abstract Builder<K, V1, V2> setSecondCollectionValidFor(Duration value);
+
+    public abstract Builder<K, V1, V2> setAllowedLateness(Duration value);
+
+    abstract EventTimeEquiJoin<K, V1, V2> build();
+  }
+
+  /**
+   * Returns a {@code Impl<K, V1, V2>} {@code PTransform} that matches elements from the first and
+   * second collection with the same keys if their timestamps are within the given interval.
+   *
+   * @param interval the allowed difference between the timestamps to allow a match
+   */
+  public EventTimeEquiJoin<K, V1, V2> within(Duration interval) {
+    return toBuilder()
+        .setFirstCollectionValidFor(interval)
+        .setSecondCollectionValidFor(interval)
+        .build();
+  }
+
+  /**
+   * Returns a {@code Impl<K, V1, V2>} {@code PTransform} that matches elements from the first and
+   * second collection with the same keys if the collection's element comes within the valid time
+   * range for the other collection.
+   *
+   * @param firstCollectionValidFor the valid time range for the first collection
+   * @param secondCollectionValidFor the valid time range for the second collection
+   */
+  public EventTimeEquiJoin<K, V1, V2> within(
+      Duration firstCollectionValidFor, Duration secondCollectionValidFor) {
+    return toBuilder()
+        .setFirstCollectionValidFor(firstCollectionValidFor)
+        .setSecondCollectionValidFor(secondCollectionValidFor)
+        .build();
+  }
+
+  /**
+   * Returns a {@code Impl<K, V1, V2>} {@code PTransform} that matches elements from the first and
+   * second collection with the same keys and allows for late elements
+   *
+   * @param allowedLateness the amount of time late elements are allowed.
+   */
+  public EventTimeEquiJoin<K, V1, V2> withAllowedLateness(Duration allowedLateness) {
+    return toBuilder().setAllowedLateness(allowedLateness).build();
+  }
+
+  @Override
+  public PCollection<KV<K, Pair<V1, V2>>> expand(PCollection<KV<K, V1>> input) {
+    Coder<K> keyCoder = JoinUtils.getKeyCoder(input);
+    Coder<V1> firstValueCoder = JoinUtils.getValueCoder(input);
+    Coder<V2> secondValueCoder = JoinUtils.getValueCoder(getSecondCollection());
+    UnionCoder unionCoder = UnionCoder.of(ImmutableList.of(firstValueCoder, secondValueCoder));
+    KvCoder<K, RawUnionValue> kvCoder = KvCoder.of(JoinUtils.getKeyCoder(input), unionCoder);
+    PCollectionList<KV<K, RawUnionValue>> union =
+        PCollectionList.of(JoinUtils.makeUnionTable(0, input, kvCoder))
+            .and(JoinUtils.makeUnionTable(1, getSecondCollection(), kvCoder));
+    return union
+        .apply("Flatten", Flatten.pCollections())
+        .apply(
+            "Join",
+            ParDo.of(
+                new EventTimeEquiJoinDoFn<>(
+                    firstValueCoder,
+                    secondValueCoder,
+                    getFirstCollectionValidFor(),
+                    getSecondCollectionValidFor(),
+                    getAllowedLateness())))
+        .setCoder(KvCoder.of(keyCoder, PairCoder.<V1, V2>of(firstValueCoder, secondValueCoder)));
+  }
+
+  private static class EventTimeEquiJoinDoFn<K, V1, V2>
+      extends DoFn<KV<K, RawUnionValue>, KV<K, Pair<V1, V2>>> {
+    private static final int FIRST_TAG = 0;
+    private static final int SECOND_TAG = 1;
+
+    // Bucket cleanup timers into TIMER_BUCKET length buckets.
+    private static final long TIMER_BUCKET = Duration.standardMinutes(1).getMillis();
+
+    // How long elements in the first and second collection are valid (can be matched) for.
+    private final Duration firstCollectionValidFor;
+    private final Duration secondCollectionValidFor;
+
+    // How long past the watermark that late elements can show up.
+    private final Duration allowedLateness;
+
+    @StateId("v1Items")
+    private final StateSpec<OrderedListState<V1>> firstCollectionItems;
+
+    @StateId("v2Items")
+    private final StateSpec<OrderedListState<V2>> secondCollectionItems;
+
+    @TimerFamily("cleanupTimers")
+    private final TimerSpec cleanupTimers = TimerSpecs.timerMap(TimeDomain.EVENT_TIME);
+
+    public EventTimeEquiJoinDoFn(
+        Coder<V1> firstValueCoder,
+        Coder<V2> secondValueCoder,
+        Duration firstValidFor,
+        Duration secondValidFor,
+        Duration allowedLateness) {
+      this.firstCollectionValidFor = firstValidFor;
+      this.secondCollectionValidFor = secondValidFor;
+      this.allowedLateness = allowedLateness;
+      firstCollectionItems = StateSpecs.orderedList(firstValueCoder);
+      secondCollectionItems = StateSpecs.orderedList(secondValueCoder);
+    }
+
+    @FunctionalInterface
+    private interface Output<T1, T2> {
+      void apply(T1 one, T2 two);
+    }
+
+    private <T, O> void processHelper(
+        Output<T, O> output,
+        KV<K, RawUnionValue> element,
+        Instant ts,
+        OrderedListState<T> thisCollection,
+        OrderedListState<O> otherCollection,
+        TimerMap cleanupTimers,
+        Duration thisCollectionValidFor,
+        Duration otherCollectionValidFor) {
+      thisCollection.add(TimestampedValue.of((T) element.getValue().getValue(), ts));
+      Instant beginning = ts.minus(otherCollectionValidFor);
+      Instant end = ts.plus(thisCollectionValidFor).plus(1L);
+      for (TimestampedValue<O> value : otherCollection.readRange(beginning, end)) {
+        output.apply((T) element.getValue().getValue(), value.getValue());
+      }
+      Instant cleanupTime = ts.plus(allowedLateness).plus(thisCollectionValidFor);
+      Instant nextBucketStart =
+          Instant.ofEpochMilli(
+              cleanupTime.getMillis() / TIMER_BUCKET * TIMER_BUCKET + TIMER_BUCKET);
+      cleanupTimers.get(Long.toString(nextBucketStart.getMillis())).set(nextBucketStart);

Review comment:
       ideally we should call withOutputTimestamp to hold the watermark back to the earliest buffered element in that timer's range. Doing this probably requires keeping a histogram in a ValueState tracking the min timestamp per minute bucket.

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/EventTimeEquiJoin.java
##########
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.join;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.state.OrderedListState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.TimerMap;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * Returns a {@link PTransform} that performs a {@link EventTimeEquiJoin} on two PCollections. A
+ * {@link EventTimeEquiJoin} joins elements with equal keys bounded by the difference in event time.
+ *
+ * <p>Example of performing a {@link EventTimeEquiJoin}:
+ *
+ * <pre>{@code
+ * PCollection<KV<K, V1>> pt1 = ...;
+ * PCollection<KV<K, V2>> pt2 = ...;
+ *
+ * PCollection<KV<K, Pair<V1, V2>> eventTimeEquiJoinCollection =
+ *   pt1.apply(EventTimeEquiJoin.<K, V1, V2>of(pt2));

Review comment:
       This is currently just an inner join




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] laraschmidt commented on a change in pull request #15275: [BEAM-7386] Adding EventTimeEquiJoin

Posted by GitBox <gi...@apache.org>.
laraschmidt commented on a change in pull request #15275:
URL: https://github.com/apache/beam/pull/15275#discussion_r683661850



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/Pair.java
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.join;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaCreate;
+
+/** A pair of two different objects. */
+@DefaultSchema(AutoValueSchema.class)

Review comment:
       Forgot that was there. :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] github-actions[bot] commented on pull request #15275: [BEAM-7386] Adding EventTimeEquiJoin

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #15275:
URL: https://github.com/apache/beam/pull/15275#issuecomment-1075137634


   This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@beam.apache.org list. Thank you for your contributions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] laraschmidt commented on pull request #15275: [BEAM-7386] Adding EventTimeEquiJoin

Posted by GitBox <gi...@apache.org>.
laraschmidt commented on pull request #15275:
URL: https://github.com/apache/beam/pull/15275#issuecomment-1007579994






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aaltay commented on pull request #15275: [BEAM-7386] Adding EventTimeEquiJoin

Posted by GitBox <gi...@apache.org>.
aaltay commented on pull request #15275:
URL: https://github.com/apache/beam/pull/15275#issuecomment-949164103


   @kennknowles - Could you please take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] laraschmidt commented on pull request #15275: [BEAM-7386] Adding EventTimeEquiJoin

Posted by GitBox <gi...@apache.org>.
laraschmidt commented on pull request #15275:
URL: https://github.com/apache/beam/pull/15275#issuecomment-941305146


   Addressed comments, PTAL. :) Thanks! @kennknowles 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kennknowles commented on a change in pull request #15275: [BEAM-7386] Adding EventTimeEquiJoin

Posted by GitBox <gi...@apache.org>.
kennknowles commented on a change in pull request #15275:
URL: https://github.com/apache/beam/pull/15275#discussion_r726554652



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/EventTimeEquiJoin.java
##########
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.join;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.state.OrderedListState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.state.TimerMap;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * Returns a {@link PTransform} that performs a {@link EventTimeEquiJoin} on two PCollections. A
+ * {@link EventTimeEquiJoin} joins elements with equal keys bounded by the difference in event time.
+ * Currently only inner join is supported.
+ *
+ * <p>Example of performing a {@link EventTimeEquiJoin}:
+ *
+ * <pre>{@code
+ * PCollection<KV<K, V1>> pt1 = ...;
+ * PCollection<KV<K, V2>> pt2 = ...;
+ *
+ * PCollection<KV<K, Pair<V1, V2>> eventTimeEquiJoinCollection =
+ *   pt1.apply(EventTimeEquiJoin.<K, V1, V2>of(pt2));
+ *
+ * @param <K> the type of the keys in the input {@code PCollection}s
+ * @param <V1> the type of the value in the first {@code PCollection}
+ * @param <V2> the type of the value in the second {@code PCollection}
+ * </pre>
+ */
+@AutoValue
+public abstract class EventTimeEquiJoin<K, V1, V2>
+    extends PTransform<PCollection<KV<K, V1>>, PCollection<KV<K, Pair<V1, V2>>>> {
+
+  /* Where the output timestamp for each element is taken from. */
+  public enum OutputTimestampFrom {
+    FIRST_COLLECTION,
+    SECOND_COLLECTION,
+    MINIMUM_TIMESTAMP,
+    MAXIMUM_TIMESTAMP
+  }
+
+  /**
+   * Returns a {@link PTransform} that performs a {@link EventTimeEquiJoin} inner join on two
+   * PCollections.
+   */
+  public static <K, V1, V2> EventTimeEquiJoin<K, V1, V2> innerJoin(
+      PCollection<KV<K, V2>> secondCollection) {
+    return new AutoValue_EventTimeEquiJoin.Builder<K, V1, V2>()
+        .setSecondCollection(secondCollection)
+        .setFirstCollectionValidFor(Duration.ZERO)
+        .setSecondCollectionValidFor(Duration.ZERO)
+        .setAllowedLateness(Duration.ZERO)
+        .setOutputTimestampFrom(OutputTimestampFrom.MINIMUM_TIMESTAMP)
+        .build();
+  }
+
+  abstract PCollection<KV<K, V2>> getSecondCollection();
+
+  abstract Duration getFirstCollectionValidFor();
+
+  abstract Duration getSecondCollectionValidFor();
+
+  abstract Duration getAllowedLateness();
+
+  abstract OutputTimestampFrom getOutputTimestampFrom();
+
+  abstract Builder<K, V1, V2> toBuilder();
+
+  @AutoValue.Builder
+  public abstract static class Builder<K, V1, V2> {
+    public abstract Builder<K, V1, V2> setSecondCollection(PCollection<KV<K, V2>> value);
+
+    public abstract Builder<K, V1, V2> setFirstCollectionValidFor(Duration value);
+
+    public abstract Builder<K, V1, V2> setSecondCollectionValidFor(Duration value);
+
+    public abstract Builder<K, V1, V2> setAllowedLateness(Duration value);
+
+    public abstract Builder<K, V1, V2> setOutputTimestampFrom(
+        OutputTimestampFrom outputTimestampFrom);
+
+    abstract EventTimeEquiJoin<K, V1, V2> build();
+  }
+
+  /**
+   * Returns a {@code Impl<K, V1, V2>} {@code PTransform} that matches elements from the first and
+   * second collection with the same keys if their timestamps are within the given interval.
+   *
+   * @param interval the allowed difference between the timestamps to allow a match
+   */
+  public EventTimeEquiJoin<K, V1, V2> within(Duration interval) {
+    return toBuilder()
+        .setFirstCollectionValidFor(interval)
+        .setSecondCollectionValidFor(interval)
+        .build();
+  }
+
+  /**
+   * Returns a {@code Impl<K, V1, V2>} {@code PTransform} that matches elements from the first and
+   * second collection with the same keys if the collection's element comes within the valid time
+   * range for the other collection.
+   *
+   * @param firstCollectionValidFor the valid time range for the first collection
+   * @param secondCollectionValidFor the valid time range for the second collection
+   */
+  public EventTimeEquiJoin<K, V1, V2> within(
+      Duration firstCollectionValidFor, Duration secondCollectionValidFor) {
+    return toBuilder()
+        .setFirstCollectionValidFor(firstCollectionValidFor)
+        .setSecondCollectionValidFor(secondCollectionValidFor)
+        .build();
+  }
+
+  /**
+   * Returns a {@code Impl<K, V1, V2>} {@code PTransform} that matches elements from the first and
+   * second collection with the same keys and allows for late elements.
+   *
+   * @param allowedLateness the amount of time late elements are allowed.
+   */
+  public EventTimeEquiJoin<K, V1, V2> withAllowedLateness(Duration allowedLateness) {
+    checkArgument(
+        allowedLateness.isLongerThan(Duration.ZERO),
+        "Allowed lateness for EventTimeEquiJoin must be positive.");
+    return toBuilder().setAllowedLateness(allowedLateness).build();
+  }
+
+  /**
+   * Returns a {@code Impl<K, V1, V2>} {@code PTransform} that matches elements from the first and
+   * second collection with the same keys and allows for late elements.
+   *
+   * @param outputTimestampFrom where to pull the output timestamp from
+   */
+  public EventTimeEquiJoin<K, V1, V2> withOutputTimestampFrom(
+      OutputTimestampFrom outputTimestampFrom) {
+    return toBuilder().setOutputTimestampFrom(outputTimestampFrom).build();
+  }
+
+  @Override
+  public PCollection<KV<K, Pair<V1, V2>>> expand(PCollection<KV<K, V1>> input) {
+    Coder<K> keyCoder = JoinUtils.getKeyCoder(input);
+    Coder<V1> firstValueCoder = JoinUtils.getValueCoder(input);
+    Coder<V2> secondValueCoder = JoinUtils.getValueCoder(getSecondCollection());
+    UnionCoder unionCoder = UnionCoder.of(ImmutableList.of(firstValueCoder, secondValueCoder));
+    KvCoder<K, RawUnionValue> kvCoder = KvCoder.of(JoinUtils.getKeyCoder(input), unionCoder);
+    PCollectionList<KV<K, RawUnionValue>> union =
+        PCollectionList.of(JoinUtils.makeUnionTable(0, input, kvCoder))
+            .and(JoinUtils.makeUnionTable(1, getSecondCollection(), kvCoder));
+    return union
+        .apply("Flatten", Flatten.pCollections())
+        .apply(
+            "Join",
+            ParDo.of(
+                new EventTimeEquiJoinDoFn<>(
+                    firstValueCoder,
+                    secondValueCoder,
+                    getFirstCollectionValidFor(),
+                    getSecondCollectionValidFor(),
+                    getAllowedLateness(),
+                    getOutputTimestampFrom())))
+        .setCoder(KvCoder.of(keyCoder, PairCoder.<V1, V2>of(firstValueCoder, secondValueCoder)));
+  }
+
+  private static class EventTimeEquiJoinDoFn<K, V1, V2>
+      extends DoFn<KV<K, RawUnionValue>, KV<K, Pair<V1, V2>>> {
+    private static final int FIRST_TAG = 0;
+    private static final int SECOND_TAG = 1;
+
+    // Bucket cleanup timers into TIMER_BUCKET length buckets.
+    private static final long TIMER_BUCKET = Duration.standardMinutes(1).getMillis();
+
+    // How long elements in the first and second collection are valid (can be matched) for.
+    private final Duration firstCollectionValidFor;
+    private final Duration secondCollectionValidFor;
+
+    // How long past the watermark that late elements can show up.
+    private final Duration allowedLateness;
+
+    // How to generate the output timestamp.
+    private final OutputTimestampFrom outputTimestampFrom;
+
+    @StateId("firstItems")
+    private final StateSpec<OrderedListState<V1>> firstCollectionItems;
+
+    @StateId("secondItems")
+    private final StateSpec<OrderedListState<V2>> secondCollectionItems;
+
+    // Timestamp of the oldest element that has not already been cleaned up used to ensure we don't
+    // accept elements for timestamps we already cleaned up.
+    @StateId("oldestFirstTimestamp")
+    private final StateSpec<ValueState<Instant>> oldestFirstTimestamp;
+
+    @StateId("oldestSecondTimestamp")
+    private final StateSpec<ValueState<Instant>> oldestSecondTimestamp;
+
+    @TimerFamily("cleanupTimers")
+    private final TimerSpec cleanupTimers = TimerSpecs.timerMap(TimeDomain.EVENT_TIME);
+
+    // Watermark holds for elements in the first collection.
+    @TimerFamily("firstCollectionHolds")
+    private final TimerSpec firstCollectionHolds = TimerSpecs.timerMap(TimeDomain.EVENT_TIME);
+
+    // Watermark holds for elements in the second collection.
+    @TimerFamily("secondCollectionHolds")
+    private final TimerSpec secondCollectionHolds = TimerSpecs.timerMap(TimeDomain.EVENT_TIME);
+
+    public EventTimeEquiJoinDoFn(
+        Coder<V1> firstValueCoder,
+        Coder<V2> secondValueCoder,
+        Duration firstValidFor,
+        Duration secondValidFor,
+        Duration allowedLateness,
+        OutputTimestampFrom outputTimestampFrom) {
+      this.firstCollectionValidFor = firstValidFor;
+      this.secondCollectionValidFor = secondValidFor;
+      this.allowedLateness = allowedLateness;
+      this.outputTimestampFrom = outputTimestampFrom;
+      firstCollectionItems = StateSpecs.orderedList(firstValueCoder);
+      secondCollectionItems = StateSpecs.orderedList(secondValueCoder);
+      oldestFirstTimestamp = StateSpecs.value();
+      oldestSecondTimestamp = StateSpecs.value();
+    }
+
+    @FunctionalInterface
+    private interface Output<T1, T2> {
+      void output(T1 one, T2 two, Instant tsOne, Instant tsTwo);
+    }
+
+    /** Adds a timer at the next bucket past time to fire at the bucket boundary. */
+    private Timer addTimer(TimerMap timers, Instant time) {
+      Instant nextBucketStart =
+          Instant.ofEpochMilli(time.getMillis() / TIMER_BUCKET * TIMER_BUCKET + TIMER_BUCKET);
+      Timer timer = timers.get(Long.toString(nextBucketStart.getMillis()));
+      timer.set(nextBucketStart);
+      return timer;
+    }
+
+    private <ThisT, OtherT> void processHelper(
+        Output<ThisT, OtherT> output,
+        KV<K, RawUnionValue> element,
+        Instant ts,
+        OrderedListState<ThisT> thisCollection,
+        OrderedListState<OtherT> otherCollection,
+        ValueState<Instant> oldestTimestampState,

Review comment:
       I can see how having a specific timestamp queue state would make it easier to have all the methods we need in a single "state type". Interestingly enough, the original design for state (before all the annotations) made it pretty easy to define composite types of state, whereas now we really cannot.

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/PairCoder.java
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.join;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.List;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.StructuredCoder;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+
+/** A {@link Coder} for {@link Pair}s that defers to underlying coders. */
+public class PairCoder<V1, V2> extends StructuredCoder<Pair<V1, V2>> {
+  private final Coder<V1> firstCoder;
+  private final Coder<V2> secondCoder;
+
+  private PairCoder(Coder<V1> firstCoder, Coder<V2> secondCoder) {
+    this.firstCoder = firstCoder;
+    this.secondCoder = secondCoder;
+  }
+
+  /** Returns a {@link PairCoder} for the given underlying value coders. */
+  public static <V1, V2> PairCoder<V1, V2> of(Coder<V1> firstCoder, Coder<V2> secondCoder) {
+    return new PairCoder<>(firstCoder, secondCoder);
+  }
+
+  @Override
+  public void encode(Pair<V1, V2> value, OutputStream outStream)
+      throws CoderException, IOException {
+    firstCoder.encode(value.getFirst(), outStream);

Review comment:
       Have we deprecated the coder "context" idea? Or do you just not want to apply it here? I would expect PairCoder to be identical to KvCoder, anyhow.

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/EventTimeEquiJoin.java
##########
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.join;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.state.OrderedListState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.state.TimerMap;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * Returns a {@link PTransform} that performs a {@link EventTimeEquiJoin} on two PCollections. A
+ * {@link EventTimeEquiJoin} joins elements with equal keys bounded by the difference in event time.
+ * Currently only inner join is supported.
+ *
+ * <p>Example of performing a {@link EventTimeEquiJoin}:
+ *
+ * <pre>{@code
+ * PCollection<KV<K, V1>> pt1 = ...;
+ * PCollection<KV<K, V2>> pt2 = ...;
+ *
+ * PCollection<KV<K, Pair<V1, V2>> eventTimeEquiJoinCollection =
+ *   pt1.apply(EventTimeEquiJoin.<K, V1, V2>of(pt2));
+ *
+ * @param <K> the type of the keys in the input {@code PCollection}s
+ * @param <V1> the type of the value in the first {@code PCollection}
+ * @param <V2> the type of the value in the second {@code PCollection}
+ * </pre>
+ */
+@AutoValue

Review comment:
       `@Experimental`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] github-actions[bot] commented on pull request #15275: [BEAM-7386] Adding EventTimeEquiJoin

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #15275:
URL: https://github.com/apache/beam/pull/15275#issuecomment-1083102345


   This pull request has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aaltay commented on pull request #15275: [BEAM-7386] Adding EventTimeEquiJoin

Posted by GitBox <gi...@apache.org>.
aaltay commented on pull request #15275:
URL: https://github.com/apache/beam/pull/15275#issuecomment-921088277


   @kennknowles @reuvenlax - Could you please take a look at this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aaltay commented on pull request #15275: [BEAM-7386] Adding EventTimeEquiJoin

Posted by GitBox <gi...@apache.org>.
aaltay commented on pull request #15275:
URL: https://github.com/apache/beam/pull/15275#issuecomment-1018056256


   What is the next step on this PR? - (I will stop pinging. Let me know if I can do anything to help.)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] laraschmidt commented on a change in pull request #15275: [BEAM-7386] Adding EventTimeEquiJoin

Posted by GitBox <gi...@apache.org>.
laraschmidt commented on a change in pull request #15275:
URL: https://github.com/apache/beam/pull/15275#discussion_r683670918



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/EventTimeEquiJoin.java
##########
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.join;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.state.OrderedListState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.TimerMap;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * Returns a {@link PTransform} that performs a {@link EventTimeEquiJoin} on two PCollections. A
+ * {@link EventTimeEquiJoin} joins elements with equal keys bounded by the difference in event time.
+ *
+ * <p>Example of performing a {@link EventTimeEquiJoin}:
+ *
+ * <pre>{@code
+ * PCollection<KV<K, V1>> pt1 = ...;
+ * PCollection<KV<K, V2>> pt2 = ...;
+ *
+ * PCollection<KV<K, Pair<V1, V2>> eventTimeEquiJoinCollection =
+ *   pt1.apply(EventTimeEquiJoin.<K, V1, V2>of(pt2));
+ *
+ * @param secondCollection the second collection to use in the join.
+ * @param <K> the type of the keys in the input {@code PCollection}s
+ * @param <V1> the type of the value in the first {@code PCollection}
+ * @param <V2> the type of the value in the second {@code PCollection}
+ * </pre>
+ */
+@AutoValue
+public abstract class EventTimeEquiJoin<K, V1, V2>

Review comment:
       I looked up what Flink and Spark call them. Spark doesn't really have a name. Flink's joins seem to be different (processing on a table at a point of time instead of things being equivalent). Tyson's suggestion in the doc was EventTimeBoundedEquiJoin. Not sure I have any other suggestions. Maybe we could shorten it and move some to the function: E.g. EventTimeBoundedJoin.innerEquiOf() or something like that?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax commented on a change in pull request #15275: [BEAM-7386] Adding EventTimeEquiJoin

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on a change in pull request #15275:
URL: https://github.com/apache/beam/pull/15275#discussion_r682982535



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/EventTimeEquiJoin.java
##########
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.join;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.state.OrderedListState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.TimerMap;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * Returns a {@link PTransform} that performs a {@link EventTimeEquiJoin} on two PCollections. A
+ * {@link EventTimeEquiJoin} joins elements with equal keys bounded by the difference in event time.
+ *
+ * <p>Example of performing a {@link EventTimeEquiJoin}:
+ *
+ * <pre>{@code
+ * PCollection<KV<K, V1>> pt1 = ...;
+ * PCollection<KV<K, V2>> pt2 = ...;
+ *
+ * PCollection<KV<K, Pair<V1, V2>> eventTimeEquiJoinCollection =
+ *   pt1.apply(EventTimeEquiJoin.<K, V1, V2>of(pt2));
+ *
+ * @param secondCollection the second collection to use in the join.
+ * @param <K> the type of the keys in the input {@code PCollection}s
+ * @param <V1> the type of the value in the first {@code PCollection}
+ * @param <V2> the type of the value in the second {@code PCollection}
+ * </pre>
+ */
+@AutoValue
+public abstract class EventTimeEquiJoin<K, V1, V2>
+    extends PTransform<PCollection<KV<K, V1>>, PCollection<KV<K, Pair<V1, V2>>>> {
+  /** Returns a {@link PTransform} that performs a {@link EventTimeEquiJoin} on two PCollections. */
+  public static <K, V1, V2> EventTimeEquiJoin<K, V1, V2> of(
+      PCollection<KV<K, V2>> secondCollection) {
+    return new AutoValue_EventTimeEquiJoin.Builder<K, V1, V2>()
+        .setSecondCollection(secondCollection)
+        .setFirstCollectionValidFor(Duration.ZERO)
+        .setSecondCollectionValidFor(Duration.ZERO)
+        .setAllowedLateness(Duration.ZERO)
+        .build();
+  }
+
+  abstract PCollection<KV<K, V2>> getSecondCollection();
+
+  abstract Duration getFirstCollectionValidFor();
+
+  abstract Duration getSecondCollectionValidFor();
+
+  abstract Duration getAllowedLateness();
+
+  abstract Builder<K, V1, V2> toBuilder();
+
+  @AutoValue.Builder
+  public abstract static class Builder<K, V1, V2> {
+    public abstract Builder<K, V1, V2> setSecondCollection(PCollection<KV<K, V2>> value);
+
+    public abstract Builder<K, V1, V2> setFirstCollectionValidFor(Duration value);
+
+    public abstract Builder<K, V1, V2> setSecondCollectionValidFor(Duration value);
+
+    public abstract Builder<K, V1, V2> setAllowedLateness(Duration value);
+
+    abstract EventTimeEquiJoin<K, V1, V2> build();
+  }
+
+  /**
+   * Returns a {@code Impl<K, V1, V2>} {@code PTransform} that matches elements from the first and
+   * second collection with the same keys if their timestamps are within the given interval.
+   *
+   * @param interval the allowed difference between the timestamps to allow a match
+   */
+  public EventTimeEquiJoin<K, V1, V2> within(Duration interval) {
+    return toBuilder()
+        .setFirstCollectionValidFor(interval)
+        .setSecondCollectionValidFor(interval)
+        .build();
+  }
+
+  /**
+   * Returns a {@code Impl<K, V1, V2>} {@code PTransform} that matches elements from the first and
+   * second collection with the same keys if the collection's element comes within the valid time
+   * range for the other collection.
+   *
+   * @param firstCollectionValidFor the valid time range for the first collection
+   * @param secondCollectionValidFor the valid time range for the second collection
+   */
+  public EventTimeEquiJoin<K, V1, V2> within(
+      Duration firstCollectionValidFor, Duration secondCollectionValidFor) {
+    return toBuilder()
+        .setFirstCollectionValidFor(firstCollectionValidFor)
+        .setSecondCollectionValidFor(secondCollectionValidFor)
+        .build();
+  }
+
+  /**
+   * Returns a {@code Impl<K, V1, V2>} {@code PTransform} that matches elements from the first and
+   * second collection with the same keys and allows for late elements
+   *
+   * @param allowedLateness the amount of time late elements are allowed.
+   */
+  public EventTimeEquiJoin<K, V1, V2> withAllowedLateness(Duration allowedLateness) {
+    return toBuilder().setAllowedLateness(allowedLateness).build();
+  }
+
+  @Override
+  public PCollection<KV<K, Pair<V1, V2>>> expand(PCollection<KV<K, V1>> input) {
+    Coder<K> keyCoder = JoinUtils.getKeyCoder(input);
+    Coder<V1> firstValueCoder = JoinUtils.getValueCoder(input);
+    Coder<V2> secondValueCoder = JoinUtils.getValueCoder(getSecondCollection());
+    UnionCoder unionCoder = UnionCoder.of(ImmutableList.of(firstValueCoder, secondValueCoder));
+    KvCoder<K, RawUnionValue> kvCoder = KvCoder.of(JoinUtils.getKeyCoder(input), unionCoder);
+    PCollectionList<KV<K, RawUnionValue>> union =
+        PCollectionList.of(JoinUtils.makeUnionTable(0, input, kvCoder))
+            .and(JoinUtils.makeUnionTable(1, getSecondCollection(), kvCoder));
+    return union
+        .apply("Flatten", Flatten.pCollections())
+        .apply(
+            "Join",
+            ParDo.of(
+                new EventTimeEquiJoinDoFn<>(
+                    firstValueCoder,
+                    secondValueCoder,
+                    getFirstCollectionValidFor(),
+                    getSecondCollectionValidFor(),
+                    getAllowedLateness())))
+        .setCoder(KvCoder.of(keyCoder, PairCoder.<V1, V2>of(firstValueCoder, secondValueCoder)));
+  }
+
+  private static class EventTimeEquiJoinDoFn<K, V1, V2>
+      extends DoFn<KV<K, RawUnionValue>, KV<K, Pair<V1, V2>>> {
+    private static final int FIRST_TAG = 0;
+    private static final int SECOND_TAG = 1;
+
+    // Bucket cleanup timers into TIMER_BUCKET length buckets.
+    private static final long TIMER_BUCKET = Duration.standardMinutes(1).getMillis();
+
+    // How long elements in the first and second collection are valid (can be matched) for.
+    private final Duration firstCollectionValidFor;
+    private final Duration secondCollectionValidFor;
+
+    // How long past the watermark that late elements can show up.
+    private final Duration allowedLateness;
+
+    @StateId("v1Items")
+    private final StateSpec<OrderedListState<V1>> firstCollectionItems;
+
+    @StateId("v2Items")
+    private final StateSpec<OrderedListState<V2>> secondCollectionItems;
+
+    @TimerFamily("cleanupTimers")
+    private final TimerSpec cleanupTimers = TimerSpecs.timerMap(TimeDomain.EVENT_TIME);
+
+    public EventTimeEquiJoinDoFn(
+        Coder<V1> firstValueCoder,
+        Coder<V2> secondValueCoder,
+        Duration firstValidFor,
+        Duration secondValidFor,
+        Duration allowedLateness) {
+      this.firstCollectionValidFor = firstValidFor;
+      this.secondCollectionValidFor = secondValidFor;
+      this.allowedLateness = allowedLateness;
+      firstCollectionItems = StateSpecs.orderedList(firstValueCoder);
+      secondCollectionItems = StateSpecs.orderedList(secondValueCoder);
+    }
+
+    @FunctionalInterface
+    private interface Output<T1, T2> {
+      void apply(T1 one, T2 two);
+    }
+
+    private <T, O> void processHelper(
+        Output<T, O> output,
+        KV<K, RawUnionValue> element,
+        Instant ts,
+        OrderedListState<T> thisCollection,
+        OrderedListState<O> otherCollection,
+        TimerMap cleanupTimers,
+        Duration thisCollectionValidFor,
+        Duration otherCollectionValidFor) {
+      thisCollection.add(TimestampedValue.of((T) element.getValue().getValue(), ts));
+      Instant beginning = ts.minus(otherCollectionValidFor);
+      Instant end = ts.plus(thisCollectionValidFor).plus(1L);
+      for (TimestampedValue<O> value : otherCollection.readRange(beginning, end)) {
+        output.apply((T) element.getValue().getValue(), value.getValue());
+      }
+      Instant cleanupTime = ts.plus(allowedLateness).plus(thisCollectionValidFor);
+      Instant nextBucketStart =
+          Instant.ofEpochMilli(
+              cleanupTime.getMillis() / TIMER_BUCKET * TIMER_BUCKET + TIMER_BUCKET);
+      cleanupTimers.get(Long.toString(nextBucketStart.getMillis())).set(nextBucketStart);
+    }
+
+    @ProcessElement
+    public void process(
+        ProcessContext context,
+        @Element KV<K, RawUnionValue> element,
+        @Timestamp Instant ts,
+        @StateId("v1Items") OrderedListState<V1> firstItems,
+        @StateId("v2Items") OrderedListState<V2> secondItems,
+        @TimerFamily("cleanupTimers") TimerMap cleanupTimers) {
+      switch (element.getValue().getUnionTag()) {
+        case FIRST_TAG:
+          processHelper(
+              (V1 v1, V2 v2) -> {
+                context.output(KV.of(element.getKey(), Pair.of(v1, v2)));
+              },
+              element,
+              ts,
+              firstItems,
+              secondItems,
+              cleanupTimers,
+              firstCollectionValidFor,
+              secondCollectionValidFor);
+          break;
+        case SECOND_TAG:
+          processHelper(
+              (V2 v2, V1 v1) -> {
+                context.output(KV.of(element.getKey(), Pair.of(v1, v2)));
+              },
+              element,
+              ts,
+              secondItems,
+              firstItems,
+              cleanupTimers,
+              secondCollectionValidFor,
+              firstCollectionValidFor);
+      }
+    }
+
+    @OnTimerFamily("cleanupTimers")
+    public void onCleanupTimer(
+        @TimerId String timerId,
+        @StateId("v1Items") OrderedListState<V1> firstItems,
+        @StateId("v2Items") OrderedListState<V2> secondItems) {
+      Instant currentTime = Instant.ofEpochMilli(Long.valueOf(timerId)).minus(allowedLateness);
+      firstItems.clearRange(Instant.ofEpochMilli(0), currentTime.minus(firstCollectionValidFor));

Review comment:
       start at BoundedWindow.TIMESTAMP_MIN_VALUE




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax commented on a change in pull request #15275: [BEAM-7386] Adding EventTimeEquiJoin

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on a change in pull request #15275:
URL: https://github.com/apache/beam/pull/15275#discussion_r682982535



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/EventTimeEquiJoin.java
##########
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.join;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.state.OrderedListState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.TimerMap;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * Returns a {@link PTransform} that performs a {@link EventTimeEquiJoin} on two PCollections. A
+ * {@link EventTimeEquiJoin} joins elements with equal keys bounded by the difference in event time.
+ *
+ * <p>Example of performing a {@link EventTimeEquiJoin}:
+ *
+ * <pre>{@code
+ * PCollection<KV<K, V1>> pt1 = ...;
+ * PCollection<KV<K, V2>> pt2 = ...;
+ *
+ * PCollection<KV<K, Pair<V1, V2>> eventTimeEquiJoinCollection =
+ *   pt1.apply(EventTimeEquiJoin.<K, V1, V2>of(pt2));
+ *
+ * @param secondCollection the second collection to use in the join.
+ * @param <K> the type of the keys in the input {@code PCollection}s
+ * @param <V1> the type of the value in the first {@code PCollection}
+ * @param <V2> the type of the value in the second {@code PCollection}
+ * </pre>
+ */
+@AutoValue
+public abstract class EventTimeEquiJoin<K, V1, V2>
+    extends PTransform<PCollection<KV<K, V1>>, PCollection<KV<K, Pair<V1, V2>>>> {
+  /** Returns a {@link PTransform} that performs a {@link EventTimeEquiJoin} on two PCollections. */
+  public static <K, V1, V2> EventTimeEquiJoin<K, V1, V2> of(
+      PCollection<KV<K, V2>> secondCollection) {
+    return new AutoValue_EventTimeEquiJoin.Builder<K, V1, V2>()
+        .setSecondCollection(secondCollection)
+        .setFirstCollectionValidFor(Duration.ZERO)
+        .setSecondCollectionValidFor(Duration.ZERO)
+        .setAllowedLateness(Duration.ZERO)
+        .build();
+  }
+
+  abstract PCollection<KV<K, V2>> getSecondCollection();
+
+  abstract Duration getFirstCollectionValidFor();
+
+  abstract Duration getSecondCollectionValidFor();
+
+  abstract Duration getAllowedLateness();
+
+  abstract Builder<K, V1, V2> toBuilder();
+
+  @AutoValue.Builder
+  public abstract static class Builder<K, V1, V2> {
+    public abstract Builder<K, V1, V2> setSecondCollection(PCollection<KV<K, V2>> value);
+
+    public abstract Builder<K, V1, V2> setFirstCollectionValidFor(Duration value);
+
+    public abstract Builder<K, V1, V2> setSecondCollectionValidFor(Duration value);
+
+    public abstract Builder<K, V1, V2> setAllowedLateness(Duration value);
+
+    abstract EventTimeEquiJoin<K, V1, V2> build();
+  }
+
+  /**
+   * Returns a {@code Impl<K, V1, V2>} {@code PTransform} that matches elements from the first and
+   * second collection with the same keys if their timestamps are within the given interval.
+   *
+   * @param interval the allowed difference between the timestamps to allow a match
+   */
+  public EventTimeEquiJoin<K, V1, V2> within(Duration interval) {
+    return toBuilder()
+        .setFirstCollectionValidFor(interval)
+        .setSecondCollectionValidFor(interval)
+        .build();
+  }
+
+  /**
+   * Returns a {@code Impl<K, V1, V2>} {@code PTransform} that matches elements from the first and
+   * second collection with the same keys if the collection's element comes within the valid time
+   * range for the other collection.
+   *
+   * @param firstCollectionValidFor the valid time range for the first collection
+   * @param secondCollectionValidFor the valid time range for the second collection
+   */
+  public EventTimeEquiJoin<K, V1, V2> within(
+      Duration firstCollectionValidFor, Duration secondCollectionValidFor) {
+    return toBuilder()
+        .setFirstCollectionValidFor(firstCollectionValidFor)
+        .setSecondCollectionValidFor(secondCollectionValidFor)
+        .build();
+  }
+
+  /**
+   * Returns a {@code Impl<K, V1, V2>} {@code PTransform} that matches elements from the first and
+   * second collection with the same keys and allows for late elements
+   *
+   * @param allowedLateness the amount of time late elements are allowed.
+   */
+  public EventTimeEquiJoin<K, V1, V2> withAllowedLateness(Duration allowedLateness) {
+    return toBuilder().setAllowedLateness(allowedLateness).build();
+  }
+
+  @Override
+  public PCollection<KV<K, Pair<V1, V2>>> expand(PCollection<KV<K, V1>> input) {
+    Coder<K> keyCoder = JoinUtils.getKeyCoder(input);
+    Coder<V1> firstValueCoder = JoinUtils.getValueCoder(input);
+    Coder<V2> secondValueCoder = JoinUtils.getValueCoder(getSecondCollection());
+    UnionCoder unionCoder = UnionCoder.of(ImmutableList.of(firstValueCoder, secondValueCoder));
+    KvCoder<K, RawUnionValue> kvCoder = KvCoder.of(JoinUtils.getKeyCoder(input), unionCoder);
+    PCollectionList<KV<K, RawUnionValue>> union =
+        PCollectionList.of(JoinUtils.makeUnionTable(0, input, kvCoder))
+            .and(JoinUtils.makeUnionTable(1, getSecondCollection(), kvCoder));
+    return union
+        .apply("Flatten", Flatten.pCollections())
+        .apply(
+            "Join",
+            ParDo.of(
+                new EventTimeEquiJoinDoFn<>(
+                    firstValueCoder,
+                    secondValueCoder,
+                    getFirstCollectionValidFor(),
+                    getSecondCollectionValidFor(),
+                    getAllowedLateness())))
+        .setCoder(KvCoder.of(keyCoder, PairCoder.<V1, V2>of(firstValueCoder, secondValueCoder)));
+  }
+
+  private static class EventTimeEquiJoinDoFn<K, V1, V2>
+      extends DoFn<KV<K, RawUnionValue>, KV<K, Pair<V1, V2>>> {
+    private static final int FIRST_TAG = 0;
+    private static final int SECOND_TAG = 1;
+
+    // Bucket cleanup timers into TIMER_BUCKET length buckets.
+    private static final long TIMER_BUCKET = Duration.standardMinutes(1).getMillis();
+
+    // How long elements in the first and second collection are valid (can be matched) for.
+    private final Duration firstCollectionValidFor;
+    private final Duration secondCollectionValidFor;
+
+    // How long past the watermark that late elements can show up.
+    private final Duration allowedLateness;
+
+    @StateId("v1Items")
+    private final StateSpec<OrderedListState<V1>> firstCollectionItems;
+
+    @StateId("v2Items")
+    private final StateSpec<OrderedListState<V2>> secondCollectionItems;
+
+    @TimerFamily("cleanupTimers")
+    private final TimerSpec cleanupTimers = TimerSpecs.timerMap(TimeDomain.EVENT_TIME);
+
+    public EventTimeEquiJoinDoFn(
+        Coder<V1> firstValueCoder,
+        Coder<V2> secondValueCoder,
+        Duration firstValidFor,
+        Duration secondValidFor,
+        Duration allowedLateness) {
+      this.firstCollectionValidFor = firstValidFor;
+      this.secondCollectionValidFor = secondValidFor;
+      this.allowedLateness = allowedLateness;
+      firstCollectionItems = StateSpecs.orderedList(firstValueCoder);
+      secondCollectionItems = StateSpecs.orderedList(secondValueCoder);
+    }
+
+    @FunctionalInterface
+    private interface Output<T1, T2> {
+      void apply(T1 one, T2 two);
+    }
+
+    private <T, O> void processHelper(
+        Output<T, O> output,
+        KV<K, RawUnionValue> element,
+        Instant ts,
+        OrderedListState<T> thisCollection,
+        OrderedListState<O> otherCollection,
+        TimerMap cleanupTimers,
+        Duration thisCollectionValidFor,
+        Duration otherCollectionValidFor) {
+      thisCollection.add(TimestampedValue.of((T) element.getValue().getValue(), ts));
+      Instant beginning = ts.minus(otherCollectionValidFor);
+      Instant end = ts.plus(thisCollectionValidFor).plus(1L);
+      for (TimestampedValue<O> value : otherCollection.readRange(beginning, end)) {
+        output.apply((T) element.getValue().getValue(), value.getValue());
+      }
+      Instant cleanupTime = ts.plus(allowedLateness).plus(thisCollectionValidFor);
+      Instant nextBucketStart =
+          Instant.ofEpochMilli(
+              cleanupTime.getMillis() / TIMER_BUCKET * TIMER_BUCKET + TIMER_BUCKET);
+      cleanupTimers.get(Long.toString(nextBucketStart.getMillis())).set(nextBucketStart);
+    }
+
+    @ProcessElement
+    public void process(
+        ProcessContext context,
+        @Element KV<K, RawUnionValue> element,
+        @Timestamp Instant ts,
+        @StateId("v1Items") OrderedListState<V1> firstItems,
+        @StateId("v2Items") OrderedListState<V2> secondItems,
+        @TimerFamily("cleanupTimers") TimerMap cleanupTimers) {
+      switch (element.getValue().getUnionTag()) {
+        case FIRST_TAG:
+          processHelper(
+              (V1 v1, V2 v2) -> {
+                context.output(KV.of(element.getKey(), Pair.of(v1, v2)));
+              },
+              element,
+              ts,
+              firstItems,
+              secondItems,
+              cleanupTimers,
+              firstCollectionValidFor,
+              secondCollectionValidFor);
+          break;
+        case SECOND_TAG:
+          processHelper(
+              (V2 v2, V1 v1) -> {
+                context.output(KV.of(element.getKey(), Pair.of(v1, v2)));
+              },
+              element,
+              ts,
+              secondItems,
+              firstItems,
+              cleanupTimers,
+              secondCollectionValidFor,
+              firstCollectionValidFor);
+      }
+    }
+
+    @OnTimerFamily("cleanupTimers")
+    public void onCleanupTimer(
+        @TimerId String timerId,
+        @StateId("v1Items") OrderedListState<V1> firstItems,
+        @StateId("v2Items") OrderedListState<V2> secondItems) {
+      Instant currentTime = Instant.ofEpochMilli(Long.valueOf(timerId)).minus(allowedLateness);
+      firstItems.clearRange(Instant.ofEpochMilli(0), currentTime.minus(firstCollectionValidFor));

Review comment:
       start at BoundedWindow.TIMESTAMP_MIN_VALUE

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/EventTimeEquiJoin.java
##########
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.join;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.state.OrderedListState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.TimerMap;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * Returns a {@link PTransform} that performs a {@link EventTimeEquiJoin} on two PCollections. A
+ * {@link EventTimeEquiJoin} joins elements with equal keys bounded by the difference in event time.
+ *
+ * <p>Example of performing a {@link EventTimeEquiJoin}:
+ *
+ * <pre>{@code
+ * PCollection<KV<K, V1>> pt1 = ...;
+ * PCollection<KV<K, V2>> pt2 = ...;
+ *
+ * PCollection<KV<K, Pair<V1, V2>> eventTimeEquiJoinCollection =
+ *   pt1.apply(EventTimeEquiJoin.<K, V1, V2>of(pt2));
+ *
+ * @param secondCollection the second collection to use in the join.
+ * @param <K> the type of the keys in the input {@code PCollection}s
+ * @param <V1> the type of the value in the first {@code PCollection}
+ * @param <V2> the type of the value in the second {@code PCollection}
+ * </pre>
+ */
+@AutoValue
+public abstract class EventTimeEquiJoin<K, V1, V2>
+    extends PTransform<PCollection<KV<K, V1>>, PCollection<KV<K, Pair<V1, V2>>>> {
+  /** Returns a {@link PTransform} that performs a {@link EventTimeEquiJoin} on two PCollections. */
+  public static <K, V1, V2> EventTimeEquiJoin<K, V1, V2> of(
+      PCollection<KV<K, V2>> secondCollection) {
+    return new AutoValue_EventTimeEquiJoin.Builder<K, V1, V2>()
+        .setSecondCollection(secondCollection)
+        .setFirstCollectionValidFor(Duration.ZERO)
+        .setSecondCollectionValidFor(Duration.ZERO)
+        .setAllowedLateness(Duration.ZERO)
+        .build();
+  }
+
+  abstract PCollection<KV<K, V2>> getSecondCollection();
+
+  abstract Duration getFirstCollectionValidFor();
+
+  abstract Duration getSecondCollectionValidFor();
+
+  abstract Duration getAllowedLateness();
+
+  abstract Builder<K, V1, V2> toBuilder();
+
+  @AutoValue.Builder
+  public abstract static class Builder<K, V1, V2> {
+    public abstract Builder<K, V1, V2> setSecondCollection(PCollection<KV<K, V2>> value);
+
+    public abstract Builder<K, V1, V2> setFirstCollectionValidFor(Duration value);
+
+    public abstract Builder<K, V1, V2> setSecondCollectionValidFor(Duration value);
+
+    public abstract Builder<K, V1, V2> setAllowedLateness(Duration value);
+
+    abstract EventTimeEquiJoin<K, V1, V2> build();
+  }
+
+  /**
+   * Returns a {@code Impl<K, V1, V2>} {@code PTransform} that matches elements from the first and
+   * second collection with the same keys if their timestamps are within the given interval.
+   *
+   * @param interval the allowed difference between the timestamps to allow a match
+   */
+  public EventTimeEquiJoin<K, V1, V2> within(Duration interval) {
+    return toBuilder()
+        .setFirstCollectionValidFor(interval)
+        .setSecondCollectionValidFor(interval)
+        .build();
+  }
+
+  /**
+   * Returns a {@code Impl<K, V1, V2>} {@code PTransform} that matches elements from the first and
+   * second collection with the same keys if the collection's element comes within the valid time
+   * range for the other collection.
+   *
+   * @param firstCollectionValidFor the valid time range for the first collection
+   * @param secondCollectionValidFor the valid time range for the second collection
+   */
+  public EventTimeEquiJoin<K, V1, V2> within(
+      Duration firstCollectionValidFor, Duration secondCollectionValidFor) {
+    return toBuilder()
+        .setFirstCollectionValidFor(firstCollectionValidFor)
+        .setSecondCollectionValidFor(secondCollectionValidFor)
+        .build();
+  }
+
+  /**
+   * Returns a {@code Impl<K, V1, V2>} {@code PTransform} that matches elements from the first and
+   * second collection with the same keys and allows for late elements
+   *
+   * @param allowedLateness the amount of time late elements are allowed.
+   */
+  public EventTimeEquiJoin<K, V1, V2> withAllowedLateness(Duration allowedLateness) {
+    return toBuilder().setAllowedLateness(allowedLateness).build();
+  }
+
+  @Override
+  public PCollection<KV<K, Pair<V1, V2>>> expand(PCollection<KV<K, V1>> input) {
+    Coder<K> keyCoder = JoinUtils.getKeyCoder(input);
+    Coder<V1> firstValueCoder = JoinUtils.getValueCoder(input);
+    Coder<V2> secondValueCoder = JoinUtils.getValueCoder(getSecondCollection());
+    UnionCoder unionCoder = UnionCoder.of(ImmutableList.of(firstValueCoder, secondValueCoder));
+    KvCoder<K, RawUnionValue> kvCoder = KvCoder.of(JoinUtils.getKeyCoder(input), unionCoder);
+    PCollectionList<KV<K, RawUnionValue>> union =
+        PCollectionList.of(JoinUtils.makeUnionTable(0, input, kvCoder))
+            .and(JoinUtils.makeUnionTable(1, getSecondCollection(), kvCoder));
+    return union
+        .apply("Flatten", Flatten.pCollections())
+        .apply(
+            "Join",
+            ParDo.of(
+                new EventTimeEquiJoinDoFn<>(
+                    firstValueCoder,
+                    secondValueCoder,
+                    getFirstCollectionValidFor(),
+                    getSecondCollectionValidFor(),
+                    getAllowedLateness())))
+        .setCoder(KvCoder.of(keyCoder, PairCoder.<V1, V2>of(firstValueCoder, secondValueCoder)));
+  }
+
+  private static class EventTimeEquiJoinDoFn<K, V1, V2>
+      extends DoFn<KV<K, RawUnionValue>, KV<K, Pair<V1, V2>>> {
+    private static final int FIRST_TAG = 0;
+    private static final int SECOND_TAG = 1;
+
+    // Bucket cleanup timers into TIMER_BUCKET length buckets.
+    private static final long TIMER_BUCKET = Duration.standardMinutes(1).getMillis();
+
+    // How long elements in the first and second collection are valid (can be matched) for.
+    private final Duration firstCollectionValidFor;
+    private final Duration secondCollectionValidFor;
+
+    // How long past the watermark that late elements can show up.
+    private final Duration allowedLateness;
+
+    @StateId("v1Items")
+    private final StateSpec<OrderedListState<V1>> firstCollectionItems;
+
+    @StateId("v2Items")
+    private final StateSpec<OrderedListState<V2>> secondCollectionItems;
+
+    @TimerFamily("cleanupTimers")
+    private final TimerSpec cleanupTimers = TimerSpecs.timerMap(TimeDomain.EVENT_TIME);
+
+    public EventTimeEquiJoinDoFn(
+        Coder<V1> firstValueCoder,
+        Coder<V2> secondValueCoder,
+        Duration firstValidFor,
+        Duration secondValidFor,
+        Duration allowedLateness) {
+      this.firstCollectionValidFor = firstValidFor;
+      this.secondCollectionValidFor = secondValidFor;
+      this.allowedLateness = allowedLateness;
+      firstCollectionItems = StateSpecs.orderedList(firstValueCoder);
+      secondCollectionItems = StateSpecs.orderedList(secondValueCoder);
+    }
+
+    @FunctionalInterface
+    private interface Output<T1, T2> {
+      void apply(T1 one, T2 two);
+    }
+
+    private <T, O> void processHelper(
+        Output<T, O> output,
+        KV<K, RawUnionValue> element,
+        Instant ts,
+        OrderedListState<T> thisCollection,
+        OrderedListState<O> otherCollection,
+        TimerMap cleanupTimers,
+        Duration thisCollectionValidFor,
+        Duration otherCollectionValidFor) {
+      thisCollection.add(TimestampedValue.of((T) element.getValue().getValue(), ts));
+      Instant beginning = ts.minus(otherCollectionValidFor);
+      Instant end = ts.plus(thisCollectionValidFor).plus(1L);
+      for (TimestampedValue<O> value : otherCollection.readRange(beginning, end)) {
+        output.apply((T) element.getValue().getValue(), value.getValue());
+      }
+      Instant cleanupTime = ts.plus(allowedLateness).plus(thisCollectionValidFor);
+      Instant nextBucketStart =
+          Instant.ofEpochMilli(
+              cleanupTime.getMillis() / TIMER_BUCKET * TIMER_BUCKET + TIMER_BUCKET);
+      cleanupTimers.get(Long.toString(nextBucketStart.getMillis())).set(nextBucketStart);
+    }
+
+    @ProcessElement
+    public void process(
+        ProcessContext context,
+        @Element KV<K, RawUnionValue> element,
+        @Timestamp Instant ts,
+        @StateId("v1Items") OrderedListState<V1> firstItems,
+        @StateId("v2Items") OrderedListState<V2> secondItems,
+        @TimerFamily("cleanupTimers") TimerMap cleanupTimers) {
+      switch (element.getValue().getUnionTag()) {
+        case FIRST_TAG:
+          processHelper(
+              (V1 v1, V2 v2) -> {
+                context.output(KV.of(element.getKey(), Pair.of(v1, v2)));
+              },
+              element,
+              ts,
+              firstItems,
+              secondItems,
+              cleanupTimers,
+              firstCollectionValidFor,
+              secondCollectionValidFor);
+          break;
+        case SECOND_TAG:
+          processHelper(
+              (V2 v2, V1 v1) -> {
+                context.output(KV.of(element.getKey(), Pair.of(v1, v2)));
+              },
+              element,
+              ts,
+              secondItems,
+              firstItems,
+              cleanupTimers,
+              secondCollectionValidFor,
+              firstCollectionValidFor);
+      }
+    }
+
+    @OnTimerFamily("cleanupTimers")
+    public void onCleanupTimer(
+        @TimerId String timerId,
+        @StateId("v1Items") OrderedListState<V1> firstItems,
+        @StateId("v2Items") OrderedListState<V2> secondItems) {
+      Instant currentTime = Instant.ofEpochMilli(Long.valueOf(timerId)).minus(allowedLateness);
+      firstItems.clearRange(Instant.ofEpochMilli(0), currentTime.minus(firstCollectionValidFor));

Review comment:
       start at BoundedWindow.TIMESTAMP_MIN_VALUE
   
   

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/EventTimeEquiJoin.java
##########
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.join;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.state.OrderedListState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.TimerMap;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * Returns a {@link PTransform} that performs a {@link EventTimeEquiJoin} on two PCollections. A
+ * {@link EventTimeEquiJoin} joins elements with equal keys bounded by the difference in event time.
+ *
+ * <p>Example of performing a {@link EventTimeEquiJoin}:
+ *
+ * <pre>{@code
+ * PCollection<KV<K, V1>> pt1 = ...;
+ * PCollection<KV<K, V2>> pt2 = ...;
+ *
+ * PCollection<KV<K, Pair<V1, V2>> eventTimeEquiJoinCollection =
+ *   pt1.apply(EventTimeEquiJoin.<K, V1, V2>of(pt2));
+ *
+ * @param secondCollection the second collection to use in the join.
+ * @param <K> the type of the keys in the input {@code PCollection}s
+ * @param <V1> the type of the value in the first {@code PCollection}
+ * @param <V2> the type of the value in the second {@code PCollection}
+ * </pre>
+ */
+@AutoValue
+public abstract class EventTimeEquiJoin<K, V1, V2>

Review comment:
       Would like to think on the name a bit more, as EventTimeEquiJoin seems a bit awkward to me.

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/Pair.java
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.join;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaCreate;
+
+/** A pair of two different objects. */
+@DefaultSchema(AutoValueSchema.class)

Review comment:
       Given that schema inference will not currently infer V1, V2 properly, I'm not sure we should specify DefaultSchema here

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/EventTimeEquiJoin.java
##########
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.join;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.state.OrderedListState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.TimerMap;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * Returns a {@link PTransform} that performs a {@link EventTimeEquiJoin} on two PCollections. A
+ * {@link EventTimeEquiJoin} joins elements with equal keys bounded by the difference in event time.
+ *
+ * <p>Example of performing a {@link EventTimeEquiJoin}:
+ *
+ * <pre>{@code
+ * PCollection<KV<K, V1>> pt1 = ...;
+ * PCollection<KV<K, V2>> pt2 = ...;
+ *
+ * PCollection<KV<K, Pair<V1, V2>> eventTimeEquiJoinCollection =
+ *   pt1.apply(EventTimeEquiJoin.<K, V1, V2>of(pt2));
+ *
+ * @param secondCollection the second collection to use in the join.
+ * @param <K> the type of the keys in the input {@code PCollection}s
+ * @param <V1> the type of the value in the first {@code PCollection}
+ * @param <V2> the type of the value in the second {@code PCollection}
+ * </pre>
+ */
+@AutoValue
+public abstract class EventTimeEquiJoin<K, V1, V2>
+    extends PTransform<PCollection<KV<K, V1>>, PCollection<KV<K, Pair<V1, V2>>>> {
+  /** Returns a {@link PTransform} that performs a {@link EventTimeEquiJoin} on two PCollections. */
+  public static <K, V1, V2> EventTimeEquiJoin<K, V1, V2> of(
+      PCollection<KV<K, V2>> secondCollection) {
+    return new AutoValue_EventTimeEquiJoin.Builder<K, V1, V2>()
+        .setSecondCollection(secondCollection)
+        .setFirstCollectionValidFor(Duration.ZERO)
+        .setSecondCollectionValidFor(Duration.ZERO)
+        .setAllowedLateness(Duration.ZERO)
+        .build();
+  }
+
+  abstract PCollection<KV<K, V2>> getSecondCollection();
+
+  abstract Duration getFirstCollectionValidFor();
+
+  abstract Duration getSecondCollectionValidFor();
+
+  abstract Duration getAllowedLateness();
+
+  abstract Builder<K, V1, V2> toBuilder();
+
+  @AutoValue.Builder
+  public abstract static class Builder<K, V1, V2> {
+    public abstract Builder<K, V1, V2> setSecondCollection(PCollection<KV<K, V2>> value);
+
+    public abstract Builder<K, V1, V2> setFirstCollectionValidFor(Duration value);
+
+    public abstract Builder<K, V1, V2> setSecondCollectionValidFor(Duration value);
+
+    public abstract Builder<K, V1, V2> setAllowedLateness(Duration value);
+
+    abstract EventTimeEquiJoin<K, V1, V2> build();
+  }
+
+  /**
+   * Returns a {@code Impl<K, V1, V2>} {@code PTransform} that matches elements from the first and
+   * second collection with the same keys if their timestamps are within the given interval.
+   *
+   * @param interval the allowed difference between the timestamps to allow a match
+   */
+  public EventTimeEquiJoin<K, V1, V2> within(Duration interval) {
+    return toBuilder()
+        .setFirstCollectionValidFor(interval)
+        .setSecondCollectionValidFor(interval)
+        .build();
+  }
+
+  /**
+   * Returns a {@code Impl<K, V1, V2>} {@code PTransform} that matches elements from the first and
+   * second collection with the same keys if the collection's element comes within the valid time
+   * range for the other collection.
+   *
+   * @param firstCollectionValidFor the valid time range for the first collection
+   * @param secondCollectionValidFor the valid time range for the second collection
+   */
+  public EventTimeEquiJoin<K, V1, V2> within(
+      Duration firstCollectionValidFor, Duration secondCollectionValidFor) {
+    return toBuilder()
+        .setFirstCollectionValidFor(firstCollectionValidFor)
+        .setSecondCollectionValidFor(secondCollectionValidFor)
+        .build();
+  }
+
+  /**
+   * Returns a {@code Impl<K, V1, V2>} {@code PTransform} that matches elements from the first and
+   * second collection with the same keys and allows for late elements
+   *
+   * @param allowedLateness the amount of time late elements are allowed.
+   */
+  public EventTimeEquiJoin<K, V1, V2> withAllowedLateness(Duration allowedLateness) {
+    return toBuilder().setAllowedLateness(allowedLateness).build();
+  }
+
+  @Override
+  public PCollection<KV<K, Pair<V1, V2>>> expand(PCollection<KV<K, V1>> input) {
+    Coder<K> keyCoder = JoinUtils.getKeyCoder(input);
+    Coder<V1> firstValueCoder = JoinUtils.getValueCoder(input);
+    Coder<V2> secondValueCoder = JoinUtils.getValueCoder(getSecondCollection());
+    UnionCoder unionCoder = UnionCoder.of(ImmutableList.of(firstValueCoder, secondValueCoder));
+    KvCoder<K, RawUnionValue> kvCoder = KvCoder.of(JoinUtils.getKeyCoder(input), unionCoder);
+    PCollectionList<KV<K, RawUnionValue>> union =
+        PCollectionList.of(JoinUtils.makeUnionTable(0, input, kvCoder))
+            .and(JoinUtils.makeUnionTable(1, getSecondCollection(), kvCoder));
+    return union
+        .apply("Flatten", Flatten.pCollections())
+        .apply(
+            "Join",
+            ParDo.of(
+                new EventTimeEquiJoinDoFn<>(
+                    firstValueCoder,
+                    secondValueCoder,
+                    getFirstCollectionValidFor(),
+                    getSecondCollectionValidFor(),
+                    getAllowedLateness())))
+        .setCoder(KvCoder.of(keyCoder, PairCoder.<V1, V2>of(firstValueCoder, secondValueCoder)));
+  }
+
+  private static class EventTimeEquiJoinDoFn<K, V1, V2>
+      extends DoFn<KV<K, RawUnionValue>, KV<K, Pair<V1, V2>>> {
+    private static final int FIRST_TAG = 0;
+    private static final int SECOND_TAG = 1;
+
+    // Bucket cleanup timers into TIMER_BUCKET length buckets.
+    private static final long TIMER_BUCKET = Duration.standardMinutes(1).getMillis();
+
+    // How long elements in the first and second collection are valid (can be matched) for.
+    private final Duration firstCollectionValidFor;
+    private final Duration secondCollectionValidFor;
+
+    // How long past the watermark that late elements can show up.
+    private final Duration allowedLateness;
+
+    @StateId("v1Items")
+    private final StateSpec<OrderedListState<V1>> firstCollectionItems;
+
+    @StateId("v2Items")
+    private final StateSpec<OrderedListState<V2>> secondCollectionItems;
+
+    @TimerFamily("cleanupTimers")
+    private final TimerSpec cleanupTimers = TimerSpecs.timerMap(TimeDomain.EVENT_TIME);
+
+    public EventTimeEquiJoinDoFn(
+        Coder<V1> firstValueCoder,
+        Coder<V2> secondValueCoder,
+        Duration firstValidFor,
+        Duration secondValidFor,
+        Duration allowedLateness) {
+      this.firstCollectionValidFor = firstValidFor;
+      this.secondCollectionValidFor = secondValidFor;
+      this.allowedLateness = allowedLateness;
+      firstCollectionItems = StateSpecs.orderedList(firstValueCoder);
+      secondCollectionItems = StateSpecs.orderedList(secondValueCoder);
+    }
+
+    @FunctionalInterface
+    private interface Output<T1, T2> {
+      void apply(T1 one, T2 two);
+    }
+
+    private <T, O> void processHelper(
+        Output<T, O> output,
+        KV<K, RawUnionValue> element,
+        Instant ts,
+        OrderedListState<T> thisCollection,
+        OrderedListState<O> otherCollection,
+        TimerMap cleanupTimers,
+        Duration thisCollectionValidFor,
+        Duration otherCollectionValidFor) {
+      thisCollection.add(TimestampedValue.of((T) element.getValue().getValue(), ts));
+      Instant beginning = ts.minus(otherCollectionValidFor);
+      Instant end = ts.plus(thisCollectionValidFor).plus(1L);
+      for (TimestampedValue<O> value : otherCollection.readRange(beginning, end)) {
+        output.apply((T) element.getValue().getValue(), value.getValue());
+      }
+      Instant cleanupTime = ts.plus(allowedLateness).plus(thisCollectionValidFor);
+      Instant nextBucketStart =
+          Instant.ofEpochMilli(
+              cleanupTime.getMillis() / TIMER_BUCKET * TIMER_BUCKET + TIMER_BUCKET);
+      cleanupTimers.get(Long.toString(nextBucketStart.getMillis())).set(nextBucketStart);
+    }
+
+    @ProcessElement
+    public void process(
+        ProcessContext context,
+        @Element KV<K, RawUnionValue> element,
+        @Timestamp Instant ts,
+        @StateId("v1Items") OrderedListState<V1> firstItems,
+        @StateId("v2Items") OrderedListState<V2> secondItems,
+        @TimerFamily("cleanupTimers") TimerMap cleanupTimers) {
+      switch (element.getValue().getUnionTag()) {
+        case FIRST_TAG:
+          processHelper(
+              (V1 v1, V2 v2) -> {
+                context.output(KV.of(element.getKey(), Pair.of(v1, v2)));
+              },
+              element,
+              ts,
+              firstItems,
+              secondItems,
+              cleanupTimers,
+              firstCollectionValidFor,
+              secondCollectionValidFor);
+          break;
+        case SECOND_TAG:
+          processHelper(
+              (V2 v2, V1 v1) -> {
+                context.output(KV.of(element.getKey(), Pair.of(v1, v2)));
+              },
+              element,
+              ts,
+              secondItems,
+              firstItems,
+              cleanupTimers,
+              secondCollectionValidFor,
+              firstCollectionValidFor);

Review comment:
       default:

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/EventTimeEquiJoin.java
##########
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.join;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.state.OrderedListState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.TimerMap;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * Returns a {@link PTransform} that performs a {@link EventTimeEquiJoin} on two PCollections. A
+ * {@link EventTimeEquiJoin} joins elements with equal keys bounded by the difference in event time.
+ *
+ * <p>Example of performing a {@link EventTimeEquiJoin}:
+ *
+ * <pre>{@code
+ * PCollection<KV<K, V1>> pt1 = ...;
+ * PCollection<KV<K, V2>> pt2 = ...;
+ *
+ * PCollection<KV<K, Pair<V1, V2>> eventTimeEquiJoinCollection =
+ *   pt1.apply(EventTimeEquiJoin.<K, V1, V2>of(pt2));
+ *
+ * @param secondCollection the second collection to use in the join.
+ * @param <K> the type of the keys in the input {@code PCollection}s
+ * @param <V1> the type of the value in the first {@code PCollection}
+ * @param <V2> the type of the value in the second {@code PCollection}
+ * </pre>
+ */
+@AutoValue
+public abstract class EventTimeEquiJoin<K, V1, V2>

Review comment:
       I also wonder about whether this PTransform should be in terms of KVs or not. However I'm starting to think that it should be and then we have the option to wrap it in a higher-level Join transform

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/EventTimeEquiJoin.java
##########
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.join;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.state.OrderedListState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.TimerMap;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * Returns a {@link PTransform} that performs a {@link EventTimeEquiJoin} on two PCollections. A
+ * {@link EventTimeEquiJoin} joins elements with equal keys bounded by the difference in event time.
+ *
+ * <p>Example of performing a {@link EventTimeEquiJoin}:
+ *
+ * <pre>{@code
+ * PCollection<KV<K, V1>> pt1 = ...;
+ * PCollection<KV<K, V2>> pt2 = ...;
+ *
+ * PCollection<KV<K, Pair<V1, V2>> eventTimeEquiJoinCollection =
+ *   pt1.apply(EventTimeEquiJoin.<K, V1, V2>of(pt2));
+ *
+ * @param secondCollection the second collection to use in the join.
+ * @param <K> the type of the keys in the input {@code PCollection}s
+ * @param <V1> the type of the value in the first {@code PCollection}
+ * @param <V2> the type of the value in the second {@code PCollection}
+ * </pre>
+ */
+@AutoValue
+public abstract class EventTimeEquiJoin<K, V1, V2>
+    extends PTransform<PCollection<KV<K, V1>>, PCollection<KV<K, Pair<V1, V2>>>> {
+  /** Returns a {@link PTransform} that performs a {@link EventTimeEquiJoin} on two PCollections. */
+  public static <K, V1, V2> EventTimeEquiJoin<K, V1, V2> of(
+      PCollection<KV<K, V2>> secondCollection) {
+    return new AutoValue_EventTimeEquiJoin.Builder<K, V1, V2>()
+        .setSecondCollection(secondCollection)
+        .setFirstCollectionValidFor(Duration.ZERO)
+        .setSecondCollectionValidFor(Duration.ZERO)
+        .setAllowedLateness(Duration.ZERO)
+        .build();
+  }
+
+  abstract PCollection<KV<K, V2>> getSecondCollection();
+
+  abstract Duration getFirstCollectionValidFor();
+
+  abstract Duration getSecondCollectionValidFor();
+
+  abstract Duration getAllowedLateness();
+
+  abstract Builder<K, V1, V2> toBuilder();
+
+  @AutoValue.Builder
+  public abstract static class Builder<K, V1, V2> {
+    public abstract Builder<K, V1, V2> setSecondCollection(PCollection<KV<K, V2>> value);
+
+    public abstract Builder<K, V1, V2> setFirstCollectionValidFor(Duration value);
+
+    public abstract Builder<K, V1, V2> setSecondCollectionValidFor(Duration value);
+
+    public abstract Builder<K, V1, V2> setAllowedLateness(Duration value);
+
+    abstract EventTimeEquiJoin<K, V1, V2> build();
+  }
+
+  /**
+   * Returns a {@code Impl<K, V1, V2>} {@code PTransform} that matches elements from the first and
+   * second collection with the same keys if their timestamps are within the given interval.
+   *
+   * @param interval the allowed difference between the timestamps to allow a match
+   */
+  public EventTimeEquiJoin<K, V1, V2> within(Duration interval) {
+    return toBuilder()
+        .setFirstCollectionValidFor(interval)
+        .setSecondCollectionValidFor(interval)
+        .build();
+  }
+
+  /**
+   * Returns a {@code Impl<K, V1, V2>} {@code PTransform} that matches elements from the first and
+   * second collection with the same keys if the collection's element comes within the valid time
+   * range for the other collection.
+   *
+   * @param firstCollectionValidFor the valid time range for the first collection
+   * @param secondCollectionValidFor the valid time range for the second collection
+   */
+  public EventTimeEquiJoin<K, V1, V2> within(
+      Duration firstCollectionValidFor, Duration secondCollectionValidFor) {
+    return toBuilder()
+        .setFirstCollectionValidFor(firstCollectionValidFor)
+        .setSecondCollectionValidFor(secondCollectionValidFor)
+        .build();
+  }
+
+  /**
+   * Returns a {@code Impl<K, V1, V2>} {@code PTransform} that matches elements from the first and
+   * second collection with the same keys and allows for late elements
+   *
+   * @param allowedLateness the amount of time late elements are allowed.
+   */
+  public EventTimeEquiJoin<K, V1, V2> withAllowedLateness(Duration allowedLateness) {
+    return toBuilder().setAllowedLateness(allowedLateness).build();
+  }
+
+  @Override
+  public PCollection<KV<K, Pair<V1, V2>>> expand(PCollection<KV<K, V1>> input) {
+    Coder<K> keyCoder = JoinUtils.getKeyCoder(input);
+    Coder<V1> firstValueCoder = JoinUtils.getValueCoder(input);
+    Coder<V2> secondValueCoder = JoinUtils.getValueCoder(getSecondCollection());
+    UnionCoder unionCoder = UnionCoder.of(ImmutableList.of(firstValueCoder, secondValueCoder));
+    KvCoder<K, RawUnionValue> kvCoder = KvCoder.of(JoinUtils.getKeyCoder(input), unionCoder);
+    PCollectionList<KV<K, RawUnionValue>> union =
+        PCollectionList.of(JoinUtils.makeUnionTable(0, input, kvCoder))
+            .and(JoinUtils.makeUnionTable(1, getSecondCollection(), kvCoder));
+    return union
+        .apply("Flatten", Flatten.pCollections())
+        .apply(
+            "Join",
+            ParDo.of(
+                new EventTimeEquiJoinDoFn<>(
+                    firstValueCoder,
+                    secondValueCoder,
+                    getFirstCollectionValidFor(),
+                    getSecondCollectionValidFor(),
+                    getAllowedLateness())))
+        .setCoder(KvCoder.of(keyCoder, PairCoder.<V1, V2>of(firstValueCoder, secondValueCoder)));
+  }
+
+  private static class EventTimeEquiJoinDoFn<K, V1, V2>
+      extends DoFn<KV<K, RawUnionValue>, KV<K, Pair<V1, V2>>> {
+    private static final int FIRST_TAG = 0;
+    private static final int SECOND_TAG = 1;
+
+    // Bucket cleanup timers into TIMER_BUCKET length buckets.
+    private static final long TIMER_BUCKET = Duration.standardMinutes(1).getMillis();
+
+    // How long elements in the first and second collection are valid (can be matched) for.
+    private final Duration firstCollectionValidFor;
+    private final Duration secondCollectionValidFor;
+
+    // How long past the watermark that late elements can show up.
+    private final Duration allowedLateness;
+
+    @StateId("v1Items")
+    private final StateSpec<OrderedListState<V1>> firstCollectionItems;
+
+    @StateId("v2Items")
+    private final StateSpec<OrderedListState<V2>> secondCollectionItems;
+
+    @TimerFamily("cleanupTimers")
+    private final TimerSpec cleanupTimers = TimerSpecs.timerMap(TimeDomain.EVENT_TIME);
+
+    public EventTimeEquiJoinDoFn(
+        Coder<V1> firstValueCoder,
+        Coder<V2> secondValueCoder,
+        Duration firstValidFor,
+        Duration secondValidFor,
+        Duration allowedLateness) {
+      this.firstCollectionValidFor = firstValidFor;
+      this.secondCollectionValidFor = secondValidFor;
+      this.allowedLateness = allowedLateness;
+      firstCollectionItems = StateSpecs.orderedList(firstValueCoder);
+      secondCollectionItems = StateSpecs.orderedList(secondValueCoder);
+    }
+
+    @FunctionalInterface
+    private interface Output<T1, T2> {
+      void apply(T1 one, T2 two);
+    }
+
+    private <T, O> void processHelper(
+        Output<T, O> output,
+        KV<K, RawUnionValue> element,
+        Instant ts,
+        OrderedListState<T> thisCollection,
+        OrderedListState<O> otherCollection,
+        TimerMap cleanupTimers,
+        Duration thisCollectionValidFor,
+        Duration otherCollectionValidFor) {
+      thisCollection.add(TimestampedValue.of((T) element.getValue().getValue(), ts));
+      Instant beginning = ts.minus(otherCollectionValidFor);
+      Instant end = ts.plus(thisCollectionValidFor).plus(1L);
+      for (TimestampedValue<O> value : otherCollection.readRange(beginning, end)) {
+        output.apply((T) element.getValue().getValue(), value.getValue());
+      }
+      Instant cleanupTime = ts.plus(allowedLateness).plus(thisCollectionValidFor);
+      Instant nextBucketStart =
+          Instant.ofEpochMilli(
+              cleanupTime.getMillis() / TIMER_BUCKET * TIMER_BUCKET + TIMER_BUCKET);
+      cleanupTimers.get(Long.toString(nextBucketStart.getMillis())).set(nextBucketStart);
+    }
+
+    @ProcessElement
+    public void process(
+        ProcessContext context,
+        @Element KV<K, RawUnionValue> element,
+        @Timestamp Instant ts,
+        @StateId("v1Items") OrderedListState<V1> firstItems,
+        @StateId("v2Items") OrderedListState<V2> secondItems,
+        @TimerFamily("cleanupTimers") TimerMap cleanupTimers) {
+      switch (element.getValue().getUnionTag()) {
+        case FIRST_TAG:
+          processHelper(
+              (V1 v1, V2 v2) -> {
+                context.output(KV.of(element.getKey(), Pair.of(v1, v2)));
+              },
+              element,
+              ts,
+              firstItems,
+              secondItems,
+              cleanupTimers,
+              firstCollectionValidFor,
+              secondCollectionValidFor);
+          break;
+        case SECOND_TAG:
+          processHelper(
+              (V2 v2, V1 v1) -> {
+                context.output(KV.of(element.getKey(), Pair.of(v1, v2)));
+              },
+              element,
+              ts,
+              secondItems,
+              firstItems,
+              cleanupTimers,
+              secondCollectionValidFor,
+              firstCollectionValidFor);
+      }
+    }
+
+    @OnTimerFamily("cleanupTimers")
+    public void onCleanupTimer(
+        @TimerId String timerId,
+        @StateId("v1Items") OrderedListState<V1> firstItems,
+        @StateId("v2Items") OrderedListState<V2> secondItems) {
+      Instant currentTime = Instant.ofEpochMilli(Long.valueOf(timerId)).minus(allowedLateness);
+      firstItems.clearRange(Instant.ofEpochMilli(0), currentTime.minus(firstCollectionValidFor));
+      secondItems.clearRange(Instant.ofEpochMilli(0), currentTime.minus(secondCollectionValidFor));

Review comment:
       ditto

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/EventTimeEquiJoin.java
##########
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.join;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.state.OrderedListState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.TimerMap;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * Returns a {@link PTransform} that performs a {@link EventTimeEquiJoin} on two PCollections. A
+ * {@link EventTimeEquiJoin} joins elements with equal keys bounded by the difference in event time.
+ *
+ * <p>Example of performing a {@link EventTimeEquiJoin}:
+ *
+ * <pre>{@code
+ * PCollection<KV<K, V1>> pt1 = ...;
+ * PCollection<KV<K, V2>> pt2 = ...;
+ *
+ * PCollection<KV<K, Pair<V1, V2>> eventTimeEquiJoinCollection =
+ *   pt1.apply(EventTimeEquiJoin.<K, V1, V2>of(pt2));
+ *
+ * @param secondCollection the second collection to use in the join.
+ * @param <K> the type of the keys in the input {@code PCollection}s
+ * @param <V1> the type of the value in the first {@code PCollection}
+ * @param <V2> the type of the value in the second {@code PCollection}
+ * </pre>
+ */
+@AutoValue
+public abstract class EventTimeEquiJoin<K, V1, V2>
+    extends PTransform<PCollection<KV<K, V1>>, PCollection<KV<K, Pair<V1, V2>>>> {
+  /** Returns a {@link PTransform} that performs a {@link EventTimeEquiJoin} on two PCollections. */
+  public static <K, V1, V2> EventTimeEquiJoin<K, V1, V2> of(
+      PCollection<KV<K, V2>> secondCollection) {
+    return new AutoValue_EventTimeEquiJoin.Builder<K, V1, V2>()
+        .setSecondCollection(secondCollection)
+        .setFirstCollectionValidFor(Duration.ZERO)
+        .setSecondCollectionValidFor(Duration.ZERO)
+        .setAllowedLateness(Duration.ZERO)
+        .build();
+  }
+
+  abstract PCollection<KV<K, V2>> getSecondCollection();
+
+  abstract Duration getFirstCollectionValidFor();
+
+  abstract Duration getSecondCollectionValidFor();
+
+  abstract Duration getAllowedLateness();
+
+  abstract Builder<K, V1, V2> toBuilder();
+
+  @AutoValue.Builder
+  public abstract static class Builder<K, V1, V2> {
+    public abstract Builder<K, V1, V2> setSecondCollection(PCollection<KV<K, V2>> value);
+
+    public abstract Builder<K, V1, V2> setFirstCollectionValidFor(Duration value);
+
+    public abstract Builder<K, V1, V2> setSecondCollectionValidFor(Duration value);
+
+    public abstract Builder<K, V1, V2> setAllowedLateness(Duration value);
+
+    abstract EventTimeEquiJoin<K, V1, V2> build();
+  }
+
+  /**
+   * Returns a {@code Impl<K, V1, V2>} {@code PTransform} that matches elements from the first and
+   * second collection with the same keys if their timestamps are within the given interval.
+   *
+   * @param interval the allowed difference between the timestamps to allow a match
+   */
+  public EventTimeEquiJoin<K, V1, V2> within(Duration interval) {
+    return toBuilder()
+        .setFirstCollectionValidFor(interval)
+        .setSecondCollectionValidFor(interval)
+        .build();
+  }
+
+  /**
+   * Returns a {@code Impl<K, V1, V2>} {@code PTransform} that matches elements from the first and
+   * second collection with the same keys if the collection's element comes within the valid time
+   * range for the other collection.
+   *
+   * @param firstCollectionValidFor the valid time range for the first collection
+   * @param secondCollectionValidFor the valid time range for the second collection
+   */
+  public EventTimeEquiJoin<K, V1, V2> within(
+      Duration firstCollectionValidFor, Duration secondCollectionValidFor) {
+    return toBuilder()
+        .setFirstCollectionValidFor(firstCollectionValidFor)
+        .setSecondCollectionValidFor(secondCollectionValidFor)
+        .build();
+  }
+
+  /**
+   * Returns a {@code Impl<K, V1, V2>} {@code PTransform} that matches elements from the first and
+   * second collection with the same keys and allows for late elements
+   *
+   * @param allowedLateness the amount of time late elements are allowed.
+   */
+  public EventTimeEquiJoin<K, V1, V2> withAllowedLateness(Duration allowedLateness) {
+    return toBuilder().setAllowedLateness(allowedLateness).build();
+  }
+
+  @Override
+  public PCollection<KV<K, Pair<V1, V2>>> expand(PCollection<KV<K, V1>> input) {
+    Coder<K> keyCoder = JoinUtils.getKeyCoder(input);
+    Coder<V1> firstValueCoder = JoinUtils.getValueCoder(input);
+    Coder<V2> secondValueCoder = JoinUtils.getValueCoder(getSecondCollection());
+    UnionCoder unionCoder = UnionCoder.of(ImmutableList.of(firstValueCoder, secondValueCoder));
+    KvCoder<K, RawUnionValue> kvCoder = KvCoder.of(JoinUtils.getKeyCoder(input), unionCoder);
+    PCollectionList<KV<K, RawUnionValue>> union =
+        PCollectionList.of(JoinUtils.makeUnionTable(0, input, kvCoder))
+            .and(JoinUtils.makeUnionTable(1, getSecondCollection(), kvCoder));
+    return union
+        .apply("Flatten", Flatten.pCollections())
+        .apply(
+            "Join",
+            ParDo.of(
+                new EventTimeEquiJoinDoFn<>(
+                    firstValueCoder,
+                    secondValueCoder,
+                    getFirstCollectionValidFor(),
+                    getSecondCollectionValidFor(),
+                    getAllowedLateness())))
+        .setCoder(KvCoder.of(keyCoder, PairCoder.<V1, V2>of(firstValueCoder, secondValueCoder)));
+  }
+
+  private static class EventTimeEquiJoinDoFn<K, V1, V2>
+      extends DoFn<KV<K, RawUnionValue>, KV<K, Pair<V1, V2>>> {
+    private static final int FIRST_TAG = 0;
+    private static final int SECOND_TAG = 1;
+
+    // Bucket cleanup timers into TIMER_BUCKET length buckets.
+    private static final long TIMER_BUCKET = Duration.standardMinutes(1).getMillis();
+
+    // How long elements in the first and second collection are valid (can be matched) for.
+    private final Duration firstCollectionValidFor;
+    private final Duration secondCollectionValidFor;
+
+    // How long past the watermark that late elements can show up.
+    private final Duration allowedLateness;
+
+    @StateId("v1Items")
+    private final StateSpec<OrderedListState<V1>> firstCollectionItems;
+
+    @StateId("v2Items")
+    private final StateSpec<OrderedListState<V2>> secondCollectionItems;
+
+    @TimerFamily("cleanupTimers")
+    private final TimerSpec cleanupTimers = TimerSpecs.timerMap(TimeDomain.EVENT_TIME);
+
+    public EventTimeEquiJoinDoFn(
+        Coder<V1> firstValueCoder,
+        Coder<V2> secondValueCoder,
+        Duration firstValidFor,
+        Duration secondValidFor,
+        Duration allowedLateness) {
+      this.firstCollectionValidFor = firstValidFor;
+      this.secondCollectionValidFor = secondValidFor;
+      this.allowedLateness = allowedLateness;
+      firstCollectionItems = StateSpecs.orderedList(firstValueCoder);
+      secondCollectionItems = StateSpecs.orderedList(secondValueCoder);
+    }
+
+    @FunctionalInterface
+    private interface Output<T1, T2> {
+      void apply(T1 one, T2 two);
+    }
+
+    private <T, O> void processHelper(
+        Output<T, O> output,
+        KV<K, RawUnionValue> element,
+        Instant ts,
+        OrderedListState<T> thisCollection,
+        OrderedListState<O> otherCollection,
+        TimerMap cleanupTimers,
+        Duration thisCollectionValidFor,
+        Duration otherCollectionValidFor) {
+      thisCollection.add(TimestampedValue.of((T) element.getValue().getValue(), ts));
+      Instant beginning = ts.minus(otherCollectionValidFor);
+      Instant end = ts.plus(thisCollectionValidFor).plus(1L);
+      for (TimestampedValue<O> value : otherCollection.readRange(beginning, end)) {
+        output.apply((T) element.getValue().getValue(), value.getValue());
+      }
+      Instant cleanupTime = ts.plus(allowedLateness).plus(thisCollectionValidFor);
+      Instant nextBucketStart =
+          Instant.ofEpochMilli(
+              cleanupTime.getMillis() / TIMER_BUCKET * TIMER_BUCKET + TIMER_BUCKET);
+      cleanupTimers.get(Long.toString(nextBucketStart.getMillis())).set(nextBucketStart);
+    }
+
+    @ProcessElement
+    public void process(
+        ProcessContext context,

Review comment:
       no need for ProcessCountext. Just add an OutputReceiver parameter instead

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/EventTimeEquiJoin.java
##########
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.join;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.state.OrderedListState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.TimerMap;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * Returns a {@link PTransform} that performs a {@link EventTimeEquiJoin} on two PCollections. A
+ * {@link EventTimeEquiJoin} joins elements with equal keys bounded by the difference in event time.
+ *
+ * <p>Example of performing a {@link EventTimeEquiJoin}:
+ *
+ * <pre>{@code
+ * PCollection<KV<K, V1>> pt1 = ...;
+ * PCollection<KV<K, V2>> pt2 = ...;
+ *
+ * PCollection<KV<K, Pair<V1, V2>> eventTimeEquiJoinCollection =
+ *   pt1.apply(EventTimeEquiJoin.<K, V1, V2>of(pt2));
+ *
+ * @param secondCollection the second collection to use in the join.
+ * @param <K> the type of the keys in the input {@code PCollection}s
+ * @param <V1> the type of the value in the first {@code PCollection}
+ * @param <V2> the type of the value in the second {@code PCollection}
+ * </pre>
+ */
+@AutoValue
+public abstract class EventTimeEquiJoin<K, V1, V2>
+    extends PTransform<PCollection<KV<K, V1>>, PCollection<KV<K, Pair<V1, V2>>>> {
+  /** Returns a {@link PTransform} that performs a {@link EventTimeEquiJoin} on two PCollections. */
+  public static <K, V1, V2> EventTimeEquiJoin<K, V1, V2> of(
+      PCollection<KV<K, V2>> secondCollection) {
+    return new AutoValue_EventTimeEquiJoin.Builder<K, V1, V2>()
+        .setSecondCollection(secondCollection)
+        .setFirstCollectionValidFor(Duration.ZERO)
+        .setSecondCollectionValidFor(Duration.ZERO)
+        .setAllowedLateness(Duration.ZERO)
+        .build();
+  }
+
+  abstract PCollection<KV<K, V2>> getSecondCollection();
+
+  abstract Duration getFirstCollectionValidFor();
+
+  abstract Duration getSecondCollectionValidFor();
+
+  abstract Duration getAllowedLateness();
+
+  abstract Builder<K, V1, V2> toBuilder();
+
+  @AutoValue.Builder
+  public abstract static class Builder<K, V1, V2> {
+    public abstract Builder<K, V1, V2> setSecondCollection(PCollection<KV<K, V2>> value);
+
+    public abstract Builder<K, V1, V2> setFirstCollectionValidFor(Duration value);
+
+    public abstract Builder<K, V1, V2> setSecondCollectionValidFor(Duration value);
+
+    public abstract Builder<K, V1, V2> setAllowedLateness(Duration value);
+
+    abstract EventTimeEquiJoin<K, V1, V2> build();
+  }
+
+  /**
+   * Returns a {@code Impl<K, V1, V2>} {@code PTransform} that matches elements from the first and
+   * second collection with the same keys if their timestamps are within the given interval.
+   *
+   * @param interval the allowed difference between the timestamps to allow a match
+   */
+  public EventTimeEquiJoin<K, V1, V2> within(Duration interval) {
+    return toBuilder()
+        .setFirstCollectionValidFor(interval)
+        .setSecondCollectionValidFor(interval)
+        .build();
+  }
+
+  /**
+   * Returns a {@code Impl<K, V1, V2>} {@code PTransform} that matches elements from the first and
+   * second collection with the same keys if the collection's element comes within the valid time
+   * range for the other collection.
+   *
+   * @param firstCollectionValidFor the valid time range for the first collection
+   * @param secondCollectionValidFor the valid time range for the second collection
+   */
+  public EventTimeEquiJoin<K, V1, V2> within(
+      Duration firstCollectionValidFor, Duration secondCollectionValidFor) {
+    return toBuilder()
+        .setFirstCollectionValidFor(firstCollectionValidFor)
+        .setSecondCollectionValidFor(secondCollectionValidFor)
+        .build();
+  }
+
+  /**
+   * Returns a {@code Impl<K, V1, V2>} {@code PTransform} that matches elements from the first and
+   * second collection with the same keys and allows for late elements
+   *
+   * @param allowedLateness the amount of time late elements are allowed.
+   */
+  public EventTimeEquiJoin<K, V1, V2> withAllowedLateness(Duration allowedLateness) {
+    return toBuilder().setAllowedLateness(allowedLateness).build();
+  }
+
+  @Override
+  public PCollection<KV<K, Pair<V1, V2>>> expand(PCollection<KV<K, V1>> input) {
+    Coder<K> keyCoder = JoinUtils.getKeyCoder(input);
+    Coder<V1> firstValueCoder = JoinUtils.getValueCoder(input);
+    Coder<V2> secondValueCoder = JoinUtils.getValueCoder(getSecondCollection());
+    UnionCoder unionCoder = UnionCoder.of(ImmutableList.of(firstValueCoder, secondValueCoder));
+    KvCoder<K, RawUnionValue> kvCoder = KvCoder.of(JoinUtils.getKeyCoder(input), unionCoder);
+    PCollectionList<KV<K, RawUnionValue>> union =
+        PCollectionList.of(JoinUtils.makeUnionTable(0, input, kvCoder))
+            .and(JoinUtils.makeUnionTable(1, getSecondCollection(), kvCoder));
+    return union
+        .apply("Flatten", Flatten.pCollections())
+        .apply(
+            "Join",
+            ParDo.of(
+                new EventTimeEquiJoinDoFn<>(
+                    firstValueCoder,
+                    secondValueCoder,
+                    getFirstCollectionValidFor(),
+                    getSecondCollectionValidFor(),
+                    getAllowedLateness())))
+        .setCoder(KvCoder.of(keyCoder, PairCoder.<V1, V2>of(firstValueCoder, secondValueCoder)));
+  }
+
+  private static class EventTimeEquiJoinDoFn<K, V1, V2>
+      extends DoFn<KV<K, RawUnionValue>, KV<K, Pair<V1, V2>>> {
+    private static final int FIRST_TAG = 0;
+    private static final int SECOND_TAG = 1;
+
+    // Bucket cleanup timers into TIMER_BUCKET length buckets.
+    private static final long TIMER_BUCKET = Duration.standardMinutes(1).getMillis();
+
+    // How long elements in the first and second collection are valid (can be matched) for.
+    private final Duration firstCollectionValidFor;
+    private final Duration secondCollectionValidFor;
+
+    // How long past the watermark that late elements can show up.
+    private final Duration allowedLateness;
+
+    @StateId("v1Items")
+    private final StateSpec<OrderedListState<V1>> firstCollectionItems;
+
+    @StateId("v2Items")
+    private final StateSpec<OrderedListState<V2>> secondCollectionItems;
+
+    @TimerFamily("cleanupTimers")
+    private final TimerSpec cleanupTimers = TimerSpecs.timerMap(TimeDomain.EVENT_TIME);
+
+    public EventTimeEquiJoinDoFn(
+        Coder<V1> firstValueCoder,
+        Coder<V2> secondValueCoder,
+        Duration firstValidFor,
+        Duration secondValidFor,
+        Duration allowedLateness) {
+      this.firstCollectionValidFor = firstValidFor;
+      this.secondCollectionValidFor = secondValidFor;
+      this.allowedLateness = allowedLateness;
+      firstCollectionItems = StateSpecs.orderedList(firstValueCoder);
+      secondCollectionItems = StateSpecs.orderedList(secondValueCoder);
+    }
+
+    @FunctionalInterface
+    private interface Output<T1, T2> {
+      void apply(T1 one, T2 two);
+    }
+
+    private <T, O> void processHelper(
+        Output<T, O> output,
+        KV<K, RawUnionValue> element,
+        Instant ts,
+        OrderedListState<T> thisCollection,
+        OrderedListState<O> otherCollection,
+        TimerMap cleanupTimers,
+        Duration thisCollectionValidFor,
+        Duration otherCollectionValidFor) {
+      thisCollection.add(TimestampedValue.of((T) element.getValue().getValue(), ts));
+      Instant beginning = ts.minus(otherCollectionValidFor);
+      Instant end = ts.plus(thisCollectionValidFor).plus(1L);
+      for (TimestampedValue<O> value : otherCollection.readRange(beginning, end)) {
+        output.apply((T) element.getValue().getValue(), value.getValue());
+      }
+      Instant cleanupTime = ts.plus(allowedLateness).plus(thisCollectionValidFor);
+      Instant nextBucketStart =
+          Instant.ofEpochMilli(
+              cleanupTime.getMillis() / TIMER_BUCKET * TIMER_BUCKET + TIMER_BUCKET);
+      cleanupTimers.get(Long.toString(nextBucketStart.getMillis())).set(nextBucketStart);
+    }
+
+    @ProcessElement
+    public void process(
+        ProcessContext context,
+        @Element KV<K, RawUnionValue> element,
+        @Timestamp Instant ts,
+        @StateId("v1Items") OrderedListState<V1> firstItems,
+        @StateId("v2Items") OrderedListState<V2> secondItems,
+        @TimerFamily("cleanupTimers") TimerMap cleanupTimers) {
+      switch (element.getValue().getUnionTag()) {
+        case FIRST_TAG:
+          processHelper(
+              (V1 v1, V2 v2) -> {
+                context.output(KV.of(element.getKey(), Pair.of(v1, v2)));
+              },
+              element,
+              ts,
+              firstItems,
+              secondItems,
+              cleanupTimers,
+              firstCollectionValidFor,
+              secondCollectionValidFor);
+          break;
+        case SECOND_TAG:
+          processHelper(
+              (V2 v2, V1 v1) -> {
+                context.output(KV.of(element.getKey(), Pair.of(v1, v2)));
+              },
+              element,
+              ts,
+              secondItems,
+              firstItems,
+              cleanupTimers,
+              secondCollectionValidFor,
+              firstCollectionValidFor);
+      }
+    }
+
+    @OnTimerFamily("cleanupTimers")
+    public void onCleanupTimer(
+        @TimerId String timerId,

Review comment:
       You could add a @Timestamp parameter, you don't need to parse it out of the string timerId

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/EventTimeEquiJoin.java
##########
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.join;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.state.OrderedListState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.TimerMap;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * Returns a {@link PTransform} that performs a {@link EventTimeEquiJoin} on two PCollections. A
+ * {@link EventTimeEquiJoin} joins elements with equal keys bounded by the difference in event time.
+ *
+ * <p>Example of performing a {@link EventTimeEquiJoin}:
+ *
+ * <pre>{@code
+ * PCollection<KV<K, V1>> pt1 = ...;
+ * PCollection<KV<K, V2>> pt2 = ...;
+ *
+ * PCollection<KV<K, Pair<V1, V2>> eventTimeEquiJoinCollection =
+ *   pt1.apply(EventTimeEquiJoin.<K, V1, V2>of(pt2));
+ *
+ * @param secondCollection the second collection to use in the join.
+ * @param <K> the type of the keys in the input {@code PCollection}s
+ * @param <V1> the type of the value in the first {@code PCollection}
+ * @param <V2> the type of the value in the second {@code PCollection}
+ * </pre>
+ */
+@AutoValue
+public abstract class EventTimeEquiJoin<K, V1, V2>
+    extends PTransform<PCollection<KV<K, V1>>, PCollection<KV<K, Pair<V1, V2>>>> {
+  /** Returns a {@link PTransform} that performs a {@link EventTimeEquiJoin} on two PCollections. */
+  public static <K, V1, V2> EventTimeEquiJoin<K, V1, V2> of(
+      PCollection<KV<K, V2>> secondCollection) {
+    return new AutoValue_EventTimeEquiJoin.Builder<K, V1, V2>()
+        .setSecondCollection(secondCollection)
+        .setFirstCollectionValidFor(Duration.ZERO)
+        .setSecondCollectionValidFor(Duration.ZERO)
+        .setAllowedLateness(Duration.ZERO)
+        .build();
+  }
+
+  abstract PCollection<KV<K, V2>> getSecondCollection();
+
+  abstract Duration getFirstCollectionValidFor();
+
+  abstract Duration getSecondCollectionValidFor();
+
+  abstract Duration getAllowedLateness();
+
+  abstract Builder<K, V1, V2> toBuilder();
+
+  @AutoValue.Builder
+  public abstract static class Builder<K, V1, V2> {
+    public abstract Builder<K, V1, V2> setSecondCollection(PCollection<KV<K, V2>> value);
+
+    public abstract Builder<K, V1, V2> setFirstCollectionValidFor(Duration value);
+
+    public abstract Builder<K, V1, V2> setSecondCollectionValidFor(Duration value);
+
+    public abstract Builder<K, V1, V2> setAllowedLateness(Duration value);
+
+    abstract EventTimeEquiJoin<K, V1, V2> build();
+  }
+
+  /**
+   * Returns a {@code Impl<K, V1, V2>} {@code PTransform} that matches elements from the first and
+   * second collection with the same keys if their timestamps are within the given interval.
+   *
+   * @param interval the allowed difference between the timestamps to allow a match
+   */
+  public EventTimeEquiJoin<K, V1, V2> within(Duration interval) {
+    return toBuilder()
+        .setFirstCollectionValidFor(interval)
+        .setSecondCollectionValidFor(interval)
+        .build();
+  }
+
+  /**
+   * Returns a {@code Impl<K, V1, V2>} {@code PTransform} that matches elements from the first and
+   * second collection with the same keys if the collection's element comes within the valid time
+   * range for the other collection.
+   *
+   * @param firstCollectionValidFor the valid time range for the first collection
+   * @param secondCollectionValidFor the valid time range for the second collection
+   */
+  public EventTimeEquiJoin<K, V1, V2> within(
+      Duration firstCollectionValidFor, Duration secondCollectionValidFor) {
+    return toBuilder()
+        .setFirstCollectionValidFor(firstCollectionValidFor)
+        .setSecondCollectionValidFor(secondCollectionValidFor)
+        .build();
+  }
+
+  /**
+   * Returns a {@code Impl<K, V1, V2>} {@code PTransform} that matches elements from the first and
+   * second collection with the same keys and allows for late elements
+   *
+   * @param allowedLateness the amount of time late elements are allowed.
+   */
+  public EventTimeEquiJoin<K, V1, V2> withAllowedLateness(Duration allowedLateness) {
+    return toBuilder().setAllowedLateness(allowedLateness).build();
+  }
+
+  @Override
+  public PCollection<KV<K, Pair<V1, V2>>> expand(PCollection<KV<K, V1>> input) {
+    Coder<K> keyCoder = JoinUtils.getKeyCoder(input);
+    Coder<V1> firstValueCoder = JoinUtils.getValueCoder(input);
+    Coder<V2> secondValueCoder = JoinUtils.getValueCoder(getSecondCollection());
+    UnionCoder unionCoder = UnionCoder.of(ImmutableList.of(firstValueCoder, secondValueCoder));
+    KvCoder<K, RawUnionValue> kvCoder = KvCoder.of(JoinUtils.getKeyCoder(input), unionCoder);
+    PCollectionList<KV<K, RawUnionValue>> union =
+        PCollectionList.of(JoinUtils.makeUnionTable(0, input, kvCoder))
+            .and(JoinUtils.makeUnionTable(1, getSecondCollection(), kvCoder));
+    return union
+        .apply("Flatten", Flatten.pCollections())
+        .apply(
+            "Join",
+            ParDo.of(
+                new EventTimeEquiJoinDoFn<>(
+                    firstValueCoder,
+                    secondValueCoder,
+                    getFirstCollectionValidFor(),
+                    getSecondCollectionValidFor(),
+                    getAllowedLateness())))
+        .setCoder(KvCoder.of(keyCoder, PairCoder.<V1, V2>of(firstValueCoder, secondValueCoder)));
+  }
+
+  private static class EventTimeEquiJoinDoFn<K, V1, V2>
+      extends DoFn<KV<K, RawUnionValue>, KV<K, Pair<V1, V2>>> {
+    private static final int FIRST_TAG = 0;
+    private static final int SECOND_TAG = 1;
+
+    // Bucket cleanup timers into TIMER_BUCKET length buckets.
+    private static final long TIMER_BUCKET = Duration.standardMinutes(1).getMillis();
+
+    // How long elements in the first and second collection are valid (can be matched) for.
+    private final Duration firstCollectionValidFor;
+    private final Duration secondCollectionValidFor;
+
+    // How long past the watermark that late elements can show up.
+    private final Duration allowedLateness;
+
+    @StateId("v1Items")
+    private final StateSpec<OrderedListState<V1>> firstCollectionItems;
+
+    @StateId("v2Items")
+    private final StateSpec<OrderedListState<V2>> secondCollectionItems;
+
+    @TimerFamily("cleanupTimers")
+    private final TimerSpec cleanupTimers = TimerSpecs.timerMap(TimeDomain.EVENT_TIME);
+
+    public EventTimeEquiJoinDoFn(
+        Coder<V1> firstValueCoder,
+        Coder<V2> secondValueCoder,
+        Duration firstValidFor,
+        Duration secondValidFor,
+        Duration allowedLateness) {
+      this.firstCollectionValidFor = firstValidFor;
+      this.secondCollectionValidFor = secondValidFor;
+      this.allowedLateness = allowedLateness;
+      firstCollectionItems = StateSpecs.orderedList(firstValueCoder);
+      secondCollectionItems = StateSpecs.orderedList(secondValueCoder);
+    }
+
+    @FunctionalInterface
+    private interface Output<T1, T2> {
+      void apply(T1 one, T2 two);
+    }
+
+    private <T, O> void processHelper(
+        Output<T, O> output,
+        KV<K, RawUnionValue> element,
+        Instant ts,
+        OrderedListState<T> thisCollection,
+        OrderedListState<O> otherCollection,
+        TimerMap cleanupTimers,
+        Duration thisCollectionValidFor,
+        Duration otherCollectionValidFor) {
+      thisCollection.add(TimestampedValue.of((T) element.getValue().getValue(), ts));
+      Instant beginning = ts.minus(otherCollectionValidFor);
+      Instant end = ts.plus(thisCollectionValidFor).plus(1L);
+      for (TimestampedValue<O> value : otherCollection.readRange(beginning, end)) {
+        output.apply((T) element.getValue().getValue(), value.getValue());
+      }
+      Instant cleanupTime = ts.plus(allowedLateness).plus(thisCollectionValidFor);
+      Instant nextBucketStart =
+          Instant.ofEpochMilli(
+              cleanupTime.getMillis() / TIMER_BUCKET * TIMER_BUCKET + TIMER_BUCKET);
+      cleanupTimers.get(Long.toString(nextBucketStart.getMillis())).set(nextBucketStart);

Review comment:
       ideally we should call withOutputTimestamp to hold the watermark back to the earliest buffered element in that timer's range. Doing this probably requires keeping a histogram in a ValueState tracking the min timestamp per minute bucket.

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/EventTimeEquiJoin.java
##########
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.join;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.state.OrderedListState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.TimerMap;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * Returns a {@link PTransform} that performs a {@link EventTimeEquiJoin} on two PCollections. A
+ * {@link EventTimeEquiJoin} joins elements with equal keys bounded by the difference in event time.
+ *
+ * <p>Example of performing a {@link EventTimeEquiJoin}:
+ *
+ * <pre>{@code
+ * PCollection<KV<K, V1>> pt1 = ...;
+ * PCollection<KV<K, V2>> pt2 = ...;
+ *
+ * PCollection<KV<K, Pair<V1, V2>> eventTimeEquiJoinCollection =
+ *   pt1.apply(EventTimeEquiJoin.<K, V1, V2>of(pt2));

Review comment:
       This is currently just an inner join




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] laraschmidt commented on a change in pull request #15275: [BEAM-7386] Adding EventTimeEquiJoin

Posted by GitBox <gi...@apache.org>.
laraschmidt commented on a change in pull request #15275:
URL: https://github.com/apache/beam/pull/15275#discussion_r683658521



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/EventTimeEquiJoin.java
##########
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.join;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.state.OrderedListState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.TimerMap;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * Returns a {@link PTransform} that performs a {@link EventTimeEquiJoin} on two PCollections. A
+ * {@link EventTimeEquiJoin} joins elements with equal keys bounded by the difference in event time.
+ *
+ * <p>Example of performing a {@link EventTimeEquiJoin}:
+ *
+ * <pre>{@code
+ * PCollection<KV<K, V1>> pt1 = ...;
+ * PCollection<KV<K, V2>> pt2 = ...;
+ *
+ * PCollection<KV<K, Pair<V1, V2>> eventTimeEquiJoinCollection =
+ *   pt1.apply(EventTimeEquiJoin.<K, V1, V2>of(pt2));

Review comment:
       Renamed the of -> innerJoin, added innerJoin to method comments. And mentioned only inner join in the header comments.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] laraschmidt commented on pull request #15275: [BEAM-7386] Adding EventTimeEquiJoin

Posted by GitBox <gi...@apache.org>.
laraschmidt commented on pull request #15275:
URL: https://github.com/apache/beam/pull/15275#issuecomment-1007822342


   We use the same watermark hold for a bucket of elements (E.g. arriving between t and t+e). And we just hold it back til t+e. So it shouldn't matter that they come out of order from what I can tell.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax commented on pull request #15275: [BEAM-7386] Adding EventTimeEquiJoin

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on pull request #15275:
URL: https://github.com/apache/beam/pull/15275#issuecomment-1007610045


   But you needed to make that function apply to timers as well, right?
   
   On Fri, Jan 7, 2022 at 9:10 AM laraschmidt ***@***.***> wrote:
   
   > The other one is in but it does not work for portable runner because
   > making it work for protable runner breaks dataflow on portable runner. :)
   >
   > But the changes can actually be treated as pretty independent. That one
   > was making the function we use here not as deprecated but we can still use
   > it in this PR even if deprecated. So this is still ready for review.
   >
   > —
   > Reply to this email directly, view it on GitHub
   > <https://github.com/apache/beam/pull/15275#issuecomment-1007579994>, or
   > unsubscribe
   > <https://github.com/notifications/unsubscribe-auth/AFAYJVPVQDONYV2V4STERPTUU4NB5ANCNFSM5BPZWL6Q>
   > .
   > Triage notifications on the go with GitHub Mobile for iOS
   > <https://apps.apple.com/app/apple-store/id1477376905?ct=notification-email&mt=8&pt=524675>
   > or Android
   > <https://play.google.com/store/apps/details?id=com.github.android&referrer=utm_campaign%3Dnotification-email%26utm_medium%3Demail%26utm_source%3Dgithub>.
   >
   > You are receiving this because you were mentioned.Message ID:
   > ***@***.***>
   >
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] laraschmidt commented on pull request #15275: [BEAM-7386] Adding EventTimeEquiJoin

Posted by GitBox <gi...@apache.org>.
laraschmidt commented on pull request #15275:
URL: https://github.com/apache/beam/pull/15275#issuecomment-941305146


   Addressed comments, PTAL. :) Thanks! @kennknowles 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax commented on a change in pull request #15275: [BEAM-7386] Adding EventTimeEquiJoin

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on a change in pull request #15275:
URL: https://github.com/apache/beam/pull/15275#discussion_r688052797



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/EventTimeEquiJoin.java
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.join;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.state.OrderedListState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.TimerMap;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * Returns a {@link PTransform} that performs a {@link EventTimeEquiJoin} on two PCollections. A
+ * {@link EventTimeEquiJoin} joins elements with equal keys bounded by the difference in event time.
+ * Currently only inner join is supported.
+ *
+ * <p>Example of performing a {@link EventTimeEquiJoin}:
+ *
+ * <pre>{@code
+ * PCollection<KV<K, V1>> pt1 = ...;
+ * PCollection<KV<K, V2>> pt2 = ...;
+ *
+ * PCollection<KV<K, Pair<V1, V2>> eventTimeEquiJoinCollection =
+ *   pt1.apply(EventTimeEquiJoin.<K, V1, V2>of(pt2));
+ *
+ * @param secondCollection the second collection to use in the join.
+ * @param <K> the type of the keys in the input {@code PCollection}s
+ * @param <V1> the type of the value in the first {@code PCollection}
+ * @param <V2> the type of the value in the second {@code PCollection}
+ * </pre>
+ */
+@AutoValue
+public abstract class EventTimeEquiJoin<K, V1, V2>
+    extends PTransform<PCollection<KV<K, V1>>, PCollection<KV<K, Pair<V1, V2>>>> {
+  /**
+   * Returns a {@link PTransform} that performs a {@link EventTimeEquiJoin} inner join on two
+   * PCollections.
+   */
+  public static <K, V1, V2> EventTimeEquiJoin<K, V1, V2> innerJoin(
+      PCollection<KV<K, V2>> secondCollection) {
+    return new AutoValue_EventTimeEquiJoin.Builder<K, V1, V2>()
+        .setSecondCollection(secondCollection)
+        .setFirstCollectionValidFor(Duration.ZERO)
+        .setSecondCollectionValidFor(Duration.ZERO)
+        .setAllowedLateness(Duration.ZERO)
+        .build();
+  }
+
+  abstract PCollection<KV<K, V2>> getSecondCollection();
+
+  abstract Duration getFirstCollectionValidFor();
+
+  abstract Duration getSecondCollectionValidFor();
+
+  abstract Duration getAllowedLateness();
+
+  abstract Builder<K, V1, V2> toBuilder();
+
+  @AutoValue.Builder
+  public abstract static class Builder<K, V1, V2> {
+    public abstract Builder<K, V1, V2> setSecondCollection(PCollection<KV<K, V2>> value);
+
+    public abstract Builder<K, V1, V2> setFirstCollectionValidFor(Duration value);
+
+    public abstract Builder<K, V1, V2> setSecondCollectionValidFor(Duration value);
+
+    public abstract Builder<K, V1, V2> setAllowedLateness(Duration value);
+
+    abstract EventTimeEquiJoin<K, V1, V2> build();
+  }
+
+  /**
+   * Returns a {@code Impl<K, V1, V2>} {@code PTransform} that matches elements from the first and
+   * second collection with the same keys if their timestamps are within the given interval.
+   *
+   * @param interval the allowed difference between the timestamps to allow a match
+   */
+  public EventTimeEquiJoin<K, V1, V2> within(Duration interval) {
+    return toBuilder()
+        .setFirstCollectionValidFor(interval)
+        .setSecondCollectionValidFor(interval)
+        .build();
+  }
+
+  /**
+   * Returns a {@code Impl<K, V1, V2>} {@code PTransform} that matches elements from the first and
+   * second collection with the same keys if the collection's element comes within the valid time
+   * range for the other collection.
+   *
+   * @param firstCollectionValidFor the valid time range for the first collection
+   * @param secondCollectionValidFor the valid time range for the second collection
+   */
+  public EventTimeEquiJoin<K, V1, V2> within(
+      Duration firstCollectionValidFor, Duration secondCollectionValidFor) {
+    return toBuilder()
+        .setFirstCollectionValidFor(firstCollectionValidFor)
+        .setSecondCollectionValidFor(secondCollectionValidFor)
+        .build();
+  }
+
+  /**
+   * Returns a {@code Impl<K, V1, V2>} {@code PTransform} that matches elements from the first and
+   * second collection with the same keys and allows for late elements
+   *
+   * @param allowedLateness the amount of time late elements are allowed.
+   */
+  public EventTimeEquiJoin<K, V1, V2> withAllowedLateness(Duration allowedLateness) {
+    return toBuilder().setAllowedLateness(allowedLateness).build();
+  }
+
+  @Override
+  public PCollection<KV<K, Pair<V1, V2>>> expand(PCollection<KV<K, V1>> input) {
+    Coder<K> keyCoder = JoinUtils.getKeyCoder(input);
+    Coder<V1> firstValueCoder = JoinUtils.getValueCoder(input);
+    Coder<V2> secondValueCoder = JoinUtils.getValueCoder(getSecondCollection());
+    UnionCoder unionCoder = UnionCoder.of(ImmutableList.of(firstValueCoder, secondValueCoder));
+    KvCoder<K, RawUnionValue> kvCoder = KvCoder.of(JoinUtils.getKeyCoder(input), unionCoder);
+    PCollectionList<KV<K, RawUnionValue>> union =
+        PCollectionList.of(JoinUtils.makeUnionTable(0, input, kvCoder))
+            .and(JoinUtils.makeUnionTable(1, getSecondCollection(), kvCoder));
+    return union
+        .apply("Flatten", Flatten.pCollections())
+        .apply(
+            "Join",
+            ParDo.of(
+                new EventTimeEquiJoinDoFn<>(
+                    firstValueCoder,
+                    secondValueCoder,
+                    getFirstCollectionValidFor(),
+                    getSecondCollectionValidFor(),
+                    getAllowedLateness())))
+        .setCoder(KvCoder.of(keyCoder, PairCoder.<V1, V2>of(firstValueCoder, secondValueCoder)));
+  }
+
+  private static class EventTimeEquiJoinDoFn<K, V1, V2>
+      extends DoFn<KV<K, RawUnionValue>, KV<K, Pair<V1, V2>>> {
+    private static final int FIRST_TAG = 0;
+    private static final int SECOND_TAG = 1;
+
+    // Bucket cleanup timers into TIMER_BUCKET length buckets.
+    private static final long TIMER_BUCKET = Duration.standardMinutes(1).getMillis();
+
+    // How long elements in the first and second collection are valid (can be matched) for.
+    private final Duration firstCollectionValidFor;
+    private final Duration secondCollectionValidFor;
+
+    // How long past the watermark that late elements can show up.
+    private final Duration allowedLateness;
+
+    @StateId("v1Items")
+    private final StateSpec<OrderedListState<V1>> firstCollectionItems;
+
+    @StateId("v2Items")
+    private final StateSpec<OrderedListState<V2>> secondCollectionItems;
+
+    @TimerFamily("cleanupTimers")
+    private final TimerSpec cleanupTimers = TimerSpecs.timerMap(TimeDomain.EVENT_TIME);
+
+    public EventTimeEquiJoinDoFn(
+        Coder<V1> firstValueCoder,
+        Coder<V2> secondValueCoder,
+        Duration firstValidFor,
+        Duration secondValidFor,
+        Duration allowedLateness) {
+      this.firstCollectionValidFor = firstValidFor;
+      this.secondCollectionValidFor = secondValidFor;
+      this.allowedLateness = allowedLateness;
+      firstCollectionItems = StateSpecs.orderedList(firstValueCoder);
+      secondCollectionItems = StateSpecs.orderedList(secondValueCoder);
+    }
+
+    @FunctionalInterface
+    private interface Output<T1, T2> {
+      void apply(T1 one, T2 two);
+    }
+
+    private <T, O> void processHelper(
+        Output<T, O> output,
+        KV<K, RawUnionValue> element,
+        Instant ts,
+        OrderedListState<T> thisCollection,
+        OrderedListState<O> otherCollection,
+        TimerMap cleanupTimers,
+        Duration thisCollectionValidFor,
+        Duration otherCollectionValidFor) {
+      thisCollection.add(TimestampedValue.of((T) element.getValue().getValue(), ts));
+      Instant beginning = ts.minus(otherCollectionValidFor);
+      Instant end = ts.plus(thisCollectionValidFor).plus(1L);
+      for (TimestampedValue<O> value : otherCollection.readRange(beginning, end)) {
+        output.apply((T) element.getValue().getValue(), value.getValue());
+      }
+      Instant cleanupTime = ts.plus(allowedLateness).plus(thisCollectionValidFor);

Review comment:
       if this element is late > allowedLateness, we'll still end up processing it above. We could store the max cleanup time  in a state variable, and skip even processing the element if it has an earlier timestamp.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] laraschmidt edited a comment on pull request #15275: [BEAM-7386] Adding EventTimeEquiJoin

Posted by GitBox <gi...@apache.org>.
laraschmidt edited a comment on pull request #15275:
URL: https://github.com/apache/beam/pull/15275#issuecomment-1007632856


   That was for an earlier iteration. The solution we ended up going with only outputs from processElement and only uses timers to hold the watermark. So we didn't end up actually needing to allow older elements from timers because we accepted that we will allow older elements from processElement function.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] laraschmidt commented on pull request #15275: [BEAM-7386] Adding EventTimeEquiJoin

Posted by GitBox <gi...@apache.org>.
laraschmidt commented on pull request #15275:
URL: https://github.com/apache/beam/pull/15275#issuecomment-1007632856


   That was for an earlier iteration. The solution we ended up going with only outputs from the actual doFn and only uses timers to hold the watermark. So we didn't end up actually needing to allow older elements from timers because we accepted that we will allow older elements from the actual DoFn.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] laraschmidt edited a comment on pull request #15275: [BEAM-7386] Adding EventTimeEquiJoin

Posted by GitBox <gi...@apache.org>.
laraschmidt edited a comment on pull request #15275:
URL: https://github.com/apache/beam/pull/15275#issuecomment-1007632856


   That was for an earlier iteration. The solution we ended up going with only outputs from processElement and only uses timers to hold the watermark. So we didn't end up actually needing to allow older elements from timers because we accepted that we will allow older elements from processElement function.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aaltay commented on pull request #15275: [BEAM-7386] Adding EventTimeEquiJoin

Posted by GitBox <gi...@apache.org>.
aaltay commented on pull request #15275:
URL: https://github.com/apache/beam/pull/15275#issuecomment-931638191


   @kennknowles @reuvenlax - Could you please take a look at this?
   
   @laraschmidt - Could you look at the failing precommit test or re-run it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] laraschmidt commented on pull request #15275: [BEAM-7386] Adding EventTimeEquiJoin

Posted by GitBox <gi...@apache.org>.
laraschmidt commented on pull request #15275:
URL: https://github.com/apache/beam/pull/15275#issuecomment-933725808


   Run Java_Examples_Dataflow PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] laraschmidt commented on a change in pull request #15275: [BEAM-7386] Adding EventTimeEquiJoin

Posted by GitBox <gi...@apache.org>.
laraschmidt commented on a change in pull request #15275:
URL: https://github.com/apache/beam/pull/15275#discussion_r726669554



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/EventTimeEquiJoin.java
##########
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.join;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.state.OrderedListState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.state.TimerMap;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * Returns a {@link PTransform} that performs a {@link EventTimeEquiJoin} on two PCollections. A
+ * {@link EventTimeEquiJoin} joins elements with equal keys bounded by the difference in event time.
+ * Currently only inner join is supported.
+ *
+ * <p>Example of performing a {@link EventTimeEquiJoin}:
+ *
+ * <pre>{@code
+ * PCollection<KV<K, V1>> pt1 = ...;
+ * PCollection<KV<K, V2>> pt2 = ...;
+ *
+ * PCollection<KV<K, Pair<V1, V2>> eventTimeEquiJoinCollection =
+ *   pt1.apply(EventTimeEquiJoin.<K, V1, V2>of(pt2));
+ *
+ * @param <K> the type of the keys in the input {@code PCollection}s
+ * @param <V1> the type of the value in the first {@code PCollection}
+ * @param <V2> the type of the value in the second {@code PCollection}
+ * </pre>
+ */
+@AutoValue

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] laraschmidt commented on pull request #15275: [BEAM-7386] Adding EventTimeEquiJoin

Posted by GitBox <gi...@apache.org>.
laraschmidt commented on pull request #15275:
URL: https://github.com/apache/beam/pull/15275#issuecomment-933853591






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] laraschmidt commented on a change in pull request #15275: [BEAM-7386] Adding EventTimeEquiJoin

Posted by GitBox <gi...@apache.org>.
laraschmidt commented on a change in pull request #15275:
URL: https://github.com/apache/beam/pull/15275#discussion_r694343407



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/EventTimeEquiJoin.java
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.join;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.state.OrderedListState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.TimerMap;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * Returns a {@link PTransform} that performs a {@link EventTimeEquiJoin} on two PCollections. A
+ * {@link EventTimeEquiJoin} joins elements with equal keys bounded by the difference in event time.
+ * Currently only inner join is supported.
+ *
+ * <p>Example of performing a {@link EventTimeEquiJoin}:
+ *
+ * <pre>{@code
+ * PCollection<KV<K, V1>> pt1 = ...;
+ * PCollection<KV<K, V2>> pt2 = ...;
+ *
+ * PCollection<KV<K, Pair<V1, V2>> eventTimeEquiJoinCollection =
+ *   pt1.apply(EventTimeEquiJoin.<K, V1, V2>of(pt2));
+ *
+ * @param secondCollection the second collection to use in the join.
+ * @param <K> the type of the keys in the input {@code PCollection}s
+ * @param <V1> the type of the value in the first {@code PCollection}
+ * @param <V2> the type of the value in the second {@code PCollection}
+ * </pre>
+ */
+@AutoValue
+public abstract class EventTimeEquiJoin<K, V1, V2>
+    extends PTransform<PCollection<KV<K, V1>>, PCollection<KV<K, Pair<V1, V2>>>> {
+  /**
+   * Returns a {@link PTransform} that performs a {@link EventTimeEquiJoin} inner join on two
+   * PCollections.
+   */
+  public static <K, V1, V2> EventTimeEquiJoin<K, V1, V2> innerJoin(
+      PCollection<KV<K, V2>> secondCollection) {
+    return new AutoValue_EventTimeEquiJoin.Builder<K, V1, V2>()
+        .setSecondCollection(secondCollection)
+        .setFirstCollectionValidFor(Duration.ZERO)
+        .setSecondCollectionValidFor(Duration.ZERO)
+        .setAllowedLateness(Duration.ZERO)
+        .build();
+  }
+
+  abstract PCollection<KV<K, V2>> getSecondCollection();
+
+  abstract Duration getFirstCollectionValidFor();
+
+  abstract Duration getSecondCollectionValidFor();
+
+  abstract Duration getAllowedLateness();
+
+  abstract Builder<K, V1, V2> toBuilder();
+
+  @AutoValue.Builder
+  public abstract static class Builder<K, V1, V2> {
+    public abstract Builder<K, V1, V2> setSecondCollection(PCollection<KV<K, V2>> value);
+
+    public abstract Builder<K, V1, V2> setFirstCollectionValidFor(Duration value);
+
+    public abstract Builder<K, V1, V2> setSecondCollectionValidFor(Duration value);
+
+    public abstract Builder<K, V1, V2> setAllowedLateness(Duration value);
+
+    abstract EventTimeEquiJoin<K, V1, V2> build();
+  }
+
+  /**
+   * Returns a {@code Impl<K, V1, V2>} {@code PTransform} that matches elements from the first and
+   * second collection with the same keys if their timestamps are within the given interval.
+   *
+   * @param interval the allowed difference between the timestamps to allow a match
+   */
+  public EventTimeEquiJoin<K, V1, V2> within(Duration interval) {
+    return toBuilder()
+        .setFirstCollectionValidFor(interval)
+        .setSecondCollectionValidFor(interval)
+        .build();
+  }
+
+  /**
+   * Returns a {@code Impl<K, V1, V2>} {@code PTransform} that matches elements from the first and
+   * second collection with the same keys if the collection's element comes within the valid time
+   * range for the other collection.
+   *
+   * @param firstCollectionValidFor the valid time range for the first collection
+   * @param secondCollectionValidFor the valid time range for the second collection
+   */
+  public EventTimeEquiJoin<K, V1, V2> within(
+      Duration firstCollectionValidFor, Duration secondCollectionValidFor) {
+    return toBuilder()
+        .setFirstCollectionValidFor(firstCollectionValidFor)
+        .setSecondCollectionValidFor(secondCollectionValidFor)
+        .build();
+  }
+
+  /**
+   * Returns a {@code Impl<K, V1, V2>} {@code PTransform} that matches elements from the first and
+   * second collection with the same keys and allows for late elements
+   *
+   * @param allowedLateness the amount of time late elements are allowed.
+   */
+  public EventTimeEquiJoin<K, V1, V2> withAllowedLateness(Duration allowedLateness) {
+    return toBuilder().setAllowedLateness(allowedLateness).build();
+  }
+
+  @Override
+  public PCollection<KV<K, Pair<V1, V2>>> expand(PCollection<KV<K, V1>> input) {
+    Coder<K> keyCoder = JoinUtils.getKeyCoder(input);
+    Coder<V1> firstValueCoder = JoinUtils.getValueCoder(input);
+    Coder<V2> secondValueCoder = JoinUtils.getValueCoder(getSecondCollection());
+    UnionCoder unionCoder = UnionCoder.of(ImmutableList.of(firstValueCoder, secondValueCoder));
+    KvCoder<K, RawUnionValue> kvCoder = KvCoder.of(JoinUtils.getKeyCoder(input), unionCoder);
+    PCollectionList<KV<K, RawUnionValue>> union =
+        PCollectionList.of(JoinUtils.makeUnionTable(0, input, kvCoder))
+            .and(JoinUtils.makeUnionTable(1, getSecondCollection(), kvCoder));
+    return union
+        .apply("Flatten", Flatten.pCollections())
+        .apply(
+            "Join",
+            ParDo.of(
+                new EventTimeEquiJoinDoFn<>(
+                    firstValueCoder,
+                    secondValueCoder,
+                    getFirstCollectionValidFor(),
+                    getSecondCollectionValidFor(),
+                    getAllowedLateness())))
+        .setCoder(KvCoder.of(keyCoder, PairCoder.<V1, V2>of(firstValueCoder, secondValueCoder)));
+  }
+
+  private static class EventTimeEquiJoinDoFn<K, V1, V2>
+      extends DoFn<KV<K, RawUnionValue>, KV<K, Pair<V1, V2>>> {
+    private static final int FIRST_TAG = 0;
+    private static final int SECOND_TAG = 1;
+
+    // Bucket cleanup timers into TIMER_BUCKET length buckets.
+    private static final long TIMER_BUCKET = Duration.standardMinutes(1).getMillis();
+
+    // How long elements in the first and second collection are valid (can be matched) for.
+    private final Duration firstCollectionValidFor;
+    private final Duration secondCollectionValidFor;
+
+    // How long past the watermark that late elements can show up.
+    private final Duration allowedLateness;
+
+    @StateId("v1Items")
+    private final StateSpec<OrderedListState<V1>> firstCollectionItems;
+
+    @StateId("v2Items")
+    private final StateSpec<OrderedListState<V2>> secondCollectionItems;
+
+    @TimerFamily("cleanupTimers")
+    private final TimerSpec cleanupTimers = TimerSpecs.timerMap(TimeDomain.EVENT_TIME);
+
+    public EventTimeEquiJoinDoFn(
+        Coder<V1> firstValueCoder,
+        Coder<V2> secondValueCoder,
+        Duration firstValidFor,
+        Duration secondValidFor,
+        Duration allowedLateness) {
+      this.firstCollectionValidFor = firstValidFor;
+      this.secondCollectionValidFor = secondValidFor;
+      this.allowedLateness = allowedLateness;
+      firstCollectionItems = StateSpecs.orderedList(firstValueCoder);
+      secondCollectionItems = StateSpecs.orderedList(secondValueCoder);
+    }
+
+    @FunctionalInterface
+    private interface Output<T1, T2> {
+      void apply(T1 one, T2 two);
+    }
+
+    private <T, O> void processHelper(
+        Output<T, O> output,
+        KV<K, RawUnionValue> element,
+        Instant ts,
+        OrderedListState<T> thisCollection,
+        OrderedListState<O> otherCollection,
+        TimerMap cleanupTimers,
+        Duration thisCollectionValidFor,
+        Duration otherCollectionValidFor) {
+      thisCollection.add(TimestampedValue.of((T) element.getValue().getValue(), ts));
+      Instant beginning = ts.minus(otherCollectionValidFor);
+      Instant end = ts.plus(thisCollectionValidFor).plus(1L);
+      for (TimestampedValue<O> value : otherCollection.readRange(beginning, end)) {
+        output.apply((T) element.getValue().getValue(), value.getValue());
+      }
+      Instant cleanupTime = ts.plus(allowedLateness).plus(thisCollectionValidFor);

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] tysonjh commented on a change in pull request #15275: [BEAM-7386] Adding EventTimeEquiJoin

Posted by GitBox <gi...@apache.org>.
tysonjh commented on a change in pull request #15275:
URL: https://github.com/apache/beam/pull/15275#discussion_r684372306



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/EventTimeEquiJoin.java
##########
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.join;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.state.OrderedListState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.TimerMap;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * Returns a {@link PTransform} that performs a {@link EventTimeEquiJoin} on two PCollections. A
+ * {@link EventTimeEquiJoin} joins elements with equal keys bounded by the difference in event time.
+ *
+ * <p>Example of performing a {@link EventTimeEquiJoin}:
+ *
+ * <pre>{@code
+ * PCollection<KV<K, V1>> pt1 = ...;
+ * PCollection<KV<K, V2>> pt2 = ...;
+ *
+ * PCollection<KV<K, Pair<V1, V2>> eventTimeEquiJoinCollection =
+ *   pt1.apply(EventTimeEquiJoin.<K, V1, V2>of(pt2));
+ *
+ * @param secondCollection the second collection to use in the join.
+ * @param <K> the type of the keys in the input {@code PCollection}s
+ * @param <V1> the type of the value in the first {@code PCollection}
+ * @param <V2> the type of the value in the second {@code PCollection}
+ * </pre>
+ */
+@AutoValue
+public abstract class EventTimeEquiJoin<K, V1, V2>

Review comment:
       Some other options that were suggested previously:
   
   * TimestampBoundedEquijoin
   * EventTimeLimitedDurationEquiJoin
   * EventTimeScopedDurationInnerJoin
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax commented on pull request #15275: [BEAM-7386] Adding EventTimeEquiJoin

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on pull request #15275:
URL: https://github.com/apache/beam/pull/15275#issuecomment-897814887


   Let me know when you're ready for another review round!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] github-actions[bot] closed pull request #15275: [BEAM-7386] Adding EventTimeEquiJoin

Posted by GitBox <gi...@apache.org>.
github-actions[bot] closed pull request #15275:
URL: https://github.com/apache/beam/pull/15275


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org