You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/08/30 17:50:44 UTC

[GitHub] [beam] kw2542 opened a new pull request #15421: [BEAM-12823] TestStream Support in Samza Portable Runner

kw2542 opened a new pull request #15421:
URL: https://github.com/apache/beam/pull/15421


   TestStream Support in Samza Portable Runner
   
   **Please** add a meaningful description for your change here
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   `ValidatesRunner` compliance status (on master branch)
   --------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Lang</th>
         <th>ULR</th>
         <th>Dataflow</th>
         <th>Flink</th>
         <th>Samza</th>
         <th>Spark</th>
         <th>Twister2</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Go</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon">
           </a>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
       <tr>
         <td>Java</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_ULR/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_ULR/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming/lastCompletedBuild/badge/icon?subject=V1+Streaming">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon?subject=V1+Java+11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2_Streaming/lastCompletedBuild/badge/icon?subject=V2+Streaming">
           </a><br>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon?subject=Java+8">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon?subject=Java+11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon?subject=Portable+Streaming">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Samza/lastCompletedBuild/badge/icon?subject=Portable">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon?subject=Structured+Streaming">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon">
           </a>
         </td>
       </tr>
       <tr>
         <td>Python</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon?subject=ValCont">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
       <tr>
         <td>XLang</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   Examples testing status on various runners
   --------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Lang</th>
         <th>ULR</th>
         <th>Dataflow</th>
         <th>Flink</th>
         <th>Samza</th>
         <th>Spark</th>
         <th>Twister2</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Go</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>Java</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Java11_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Java11_Cron/lastCompletedBuild/badge/icon?subject=V1+Java11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_Examples_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_Examples_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
         </td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>Python</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>XLang</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   Post-Commit SDK/Transform Integration Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Go</th>
         <th>Java</th>
         <th>Python</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon?subject=3.6">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon?subject=3.7">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon?subject=3.8">
           </a>
         </td>
       </tr>
     </tbody>
   </table>
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>---</th>
         <th>Java</th>
         <th>Python</th>
         <th>Go</th>
         <th>Website</th>
         <th>Whitespace</th>
         <th>Typescript</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Non-portable</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon">
           </a><br>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon?subject=Tests">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon?subject=Lint">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon?subject=Docker">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon?subject=Docs">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
       </tr>
       <tr>
         <td>Portable</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_GoPortable_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_GoPortable_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kw2542 commented on pull request #15421: [BEAM-12823] TestStream Support in Samza Portable Runner

Posted by GitBox <gi...@apache.org>.
kw2542 commented on pull request #15421:
URL: https://github.com/apache/beam/pull/15421#issuecomment-909619267


   Run Python_PVR_Flink PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kw2542 commented on pull request #15421: [BEAM-12823] TestStream Support in Samza Portable Runner

Posted by GitBox <gi...@apache.org>.
kw2542 commented on pull request #15421:
URL: https://github.com/apache/beam/pull/15421#issuecomment-909789839


   Run Java Flink PortableValidatesRunner Streaming


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kw2542 commented on pull request #15421: [BEAM-12823] TestStream Support in Samza Portable Runner

Posted by GitBox <gi...@apache.org>.
kw2542 commented on pull request #15421:
URL: https://github.com/apache/beam/pull/15421#issuecomment-908840367


   Run Java Samza PortableValidatesRunner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kw2542 commented on a change in pull request #15421: [BEAM-12823] TestStream Support in Samza Portable Runner

Posted by GitBox <gi...@apache.org>.
kw2542 commented on a change in pull request #15421:
URL: https://github.com/apache/beam/pull/15421#discussion_r701312783



##########
File path: runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaTestStreamTranslator.java
##########
@@ -90,11 +95,53 @@ public void translate(
     ctx.registerInputMessageStream(output, inputDescriptor);
   }
 
+  @SuppressWarnings("unchecked")
   @Override
   public void translatePortable(
       PipelineNode.PTransformNode transform,
       QueryablePipeline pipeline,
       PortableTranslationContext ctx) {
-    throw new SamzaException("TestStream is not supported in portable by Samza runner");
+    final String outputId = ctx.getOutputId(transform);
+    final String escapedOutputId = SamzaPipelineTranslatorUtils.escape(outputId);
+    final GenericSystemDescriptor systemDescriptor =
+        new GenericSystemDescriptor(escapedOutputId, SamzaTestStreamSystemFactory.class.getName());
+    final ByteString bytes = transform.getTransform().getSpec().getPayload();
+    final RunnerApi.TestStreamPayload payload;
+    final Coder<T> coder;
+
+    try {
+      payload = RunnerApi.TestStreamPayload.parseFrom(bytes);
+      coder =
+          (Coder<T>)
+              RehydratedComponents.forComponents(pipeline.getComponents())
+                  .getCoder(payload.getCoderId());
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    // the decoder for encodedTestStream
+    SerializableFunction<String, TestStream<?>> testStreamDecoder =
+        string -> {
+          try {
+            return TestStreamTranslation.testStreamFromProtoPayload(payload, coder);
+          } catch (IOException e) {
+            throw new SamzaException("Could not decode TestStream.", e);
+          }
+        };
+
+    final Map<String, String> systemConfig =
+        ImmutableMap.of(
+            "encodedTestStream",
+            Base64Serializer.serializeUnchecked(bytes),

Review comment:
       Updated the decoder function not to encode the input anymore.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kw2542 commented on pull request #15421: [BEAM-12823] TestStream Support in Samza Portable Runner

Posted by GitBox <gi...@apache.org>.
kw2542 commented on pull request #15421:
URL: https://github.com/apache/beam/pull/15421#issuecomment-909788577


   This PR does not change python precommit at all, and the failed tests failed with empty commit as well: https://github.com/apache/beam/pull/15433 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kw2542 removed a comment on pull request #15421: [BEAM-12823] TestStream Support in Samza Portable Runner

Posted by GitBox <gi...@apache.org>.
kw2542 removed a comment on pull request #15421:
URL: https://github.com/apache/beam/pull/15421#issuecomment-909546088


   Run Java Samza PortableValidatesRunner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kw2542 commented on a change in pull request #15421: [BEAM-12823] TestStream Support in Samza Portable Runner

Posted by GitBox <gi...@apache.org>.
kw2542 commented on a change in pull request #15421:
URL: https://github.com/apache/beam/pull/15421#discussion_r701313044



##########
File path: runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaTestStreamTranslator.java
##########
@@ -90,11 +95,53 @@ public void translate(
     ctx.registerInputMessageStream(output, inputDescriptor);
   }
 
+  @SuppressWarnings("unchecked")
   @Override
   public void translatePortable(
       PipelineNode.PTransformNode transform,
       QueryablePipeline pipeline,
       PortableTranslationContext ctx) {
-    throw new SamzaException("TestStream is not supported in portable by Samza runner");
+    final String outputId = ctx.getOutputId(transform);
+    final String escapedOutputId = SamzaPipelineTranslatorUtils.escape(outputId);
+    final GenericSystemDescriptor systemDescriptor =
+        new GenericSystemDescriptor(escapedOutputId, SamzaTestStreamSystemFactory.class.getName());
+    final ByteString bytes = transform.getTransform().getSpec().getPayload();
+    final RunnerApi.TestStreamPayload payload;
+    final Coder<T> coder;
+
+    try {
+      payload = RunnerApi.TestStreamPayload.parseFrom(bytes);
+      coder =
+          (Coder<T>)
+              RehydratedComponents.forComponents(pipeline.getComponents())
+                  .getCoder(payload.getCoderId());
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    // the decoder for encodedTestStream
+    SerializableFunction<String, TestStream<?>> testStreamDecoder =
+        string -> {

Review comment:
       Updated




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kw2542 removed a comment on pull request #15421: [BEAM-12823] TestStream Support in Samza Portable Runner

Posted by GitBox <gi...@apache.org>.
kw2542 removed a comment on pull request #15421:
URL: https://github.com/apache/beam/pull/15421#issuecomment-908565107


   Run Java Samza PortableValidatesRunner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kw2542 commented on pull request #15421: [BEAM-12823] TestStream Support in Samza Portable Runner

Posted by GitBox <gi...@apache.org>.
kw2542 commented on pull request #15421:
URL: https://github.com/apache/beam/pull/15421#issuecomment-912078050


   Run Python Samza ValidatesRunner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kw2542 commented on pull request #15421: [BEAM-12823] TestStream Support in Samza Portable Runner

Posted by GitBox <gi...@apache.org>.
kw2542 commented on pull request #15421:
URL: https://github.com/apache/beam/pull/15421#issuecomment-910685880


   Run Python Samza ValidatesRunner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kw2542 commented on pull request #15421: [BEAM-12823] TestStream Support in Samza Portable Runner

Posted by GitBox <gi...@apache.org>.
kw2542 commented on pull request #15421:
URL: https://github.com/apache/beam/pull/15421#issuecomment-909598427


   Run Java Samza PortableValidatesRunner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kw2542 removed a comment on pull request #15421: [BEAM-12823] TestStream Support in Samza Portable Runner

Posted by GitBox <gi...@apache.org>.
kw2542 removed a comment on pull request #15421:
URL: https://github.com/apache/beam/pull/15421#issuecomment-909619267






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] xinyuiscool commented on a change in pull request #15421: [BEAM-12823] TestStream Support in Samza Portable Runner

Posted by GitBox <gi...@apache.org>.
xinyuiscool commented on a change in pull request #15421:
URL: https://github.com/apache/beam/pull/15421#discussion_r701406684



##########
File path: runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaTestStreamTranslator.java
##########
@@ -90,11 +97,54 @@ public void translate(
     ctx.registerInputMessageStream(output, inputDescriptor);
   }
 
+  @SuppressWarnings("unchecked")
   @Override
   public void translatePortable(
       PipelineNode.PTransformNode transform,
       QueryablePipeline pipeline,
       PortableTranslationContext ctx) {
-    throw new SamzaException("TestStream is not supported in portable by Samza runner");
+    final ByteString bytes = transform.getTransform().getSpec().getPayload();
+    final Coder<T> coder;
+
+    try {
+      coder =
+          (Coder<T>)
+              RehydratedComponents.forComponents(pipeline.getComponents())
+                  .getCoder(RunnerApi.TestStreamPayload.parseFrom(bytes).getCoderId());
+    } catch (IOException e) {
+      throw new SamzaException(e);
+    }
+
+    // the decoder for encodedTestStream
+    SerializableFunction<String, TestStream<?>> testStreamDecoder =
+        string -> {

Review comment:
       Let's have a meaningful name for string. Thx!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kw2542 commented on pull request #15421: [BEAM-12823] TestStream Support in Samza Portable Runner

Posted by GitBox <gi...@apache.org>.
kw2542 commented on pull request #15421:
URL: https://github.com/apache/beam/pull/15421#issuecomment-908555621


   Run Java Samza PortableValidatesRunner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kw2542 removed a comment on pull request #15421: [BEAM-12823] TestStream Support in Samza Portable Runner

Posted by GitBox <gi...@apache.org>.
kw2542 removed a comment on pull request #15421:
URL: https://github.com/apache/beam/pull/15421#issuecomment-909657159






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kw2542 commented on pull request #15421: [BEAM-12823] TestStream Support in Samza Portable Runner

Posted by GitBox <gi...@apache.org>.
kw2542 commented on pull request #15421:
URL: https://github.com/apache/beam/pull/15421#issuecomment-909790457


   Run Direct ValidatesRunner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kw2542 removed a comment on pull request #15421: [BEAM-12823] TestStream Support in Samza Portable Runner

Posted by GitBox <gi...@apache.org>.
kw2542 removed a comment on pull request #15421:
URL: https://github.com/apache/beam/pull/15421#issuecomment-908840367






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kw2542 commented on pull request #15421: [BEAM-12823] TestStream Support in Samza Portable Runner

Posted by GitBox <gi...@apache.org>.
kw2542 commented on pull request #15421:
URL: https://github.com/apache/beam/pull/15421#issuecomment-908555621






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] xinyuiscool commented on a change in pull request #15421: [BEAM-12823] TestStream Support in Samza Portable Runner

Posted by GitBox <gi...@apache.org>.
xinyuiscool commented on a change in pull request #15421:
URL: https://github.com/apache/beam/pull/15421#discussion_r701406444



##########
File path: runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaTestStreamTranslator.java
##########
@@ -90,11 +97,54 @@ public void translate(
     ctx.registerInputMessageStream(output, inputDescriptor);
   }
 
+  @SuppressWarnings("unchecked")
   @Override
   public void translatePortable(
       PipelineNode.PTransformNode transform,
       QueryablePipeline pipeline,
       PortableTranslationContext ctx) {
-    throw new SamzaException("TestStream is not supported in portable by Samza runner");
+    final ByteString bytes = transform.getTransform().getSpec().getPayload();
+    final Coder<T> coder;
+
+    try {
+      coder =
+          (Coder<T>)
+              RehydratedComponents.forComponents(pipeline.getComponents())
+                  .getCoder(RunnerApi.TestStreamPayload.parseFrom(bytes).getCoderId());
+    } catch (IOException e) {
+      throw new SamzaException(e);

Review comment:
       Let's throw a RuntimeException() here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kw2542 commented on pull request #15421: [BEAM-12823] TestStream Support in Samza Portable Runner

Posted by GitBox <gi...@apache.org>.
kw2542 commented on pull request #15421:
URL: https://github.com/apache/beam/pull/15421#issuecomment-909788769


   Run Python Samza ValidatesRunner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kw2542 commented on pull request #15421: [BEAM-12823] TestStream Support in Samza Portable Runner

Posted by GitBox <gi...@apache.org>.
kw2542 commented on pull request #15421:
URL: https://github.com/apache/beam/pull/15421#issuecomment-908565107


   Run Java Samza PortableValidatesRunner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kw2542 commented on pull request #15421: [BEAM-12823] TestStream Support in Samza Portable Runner

Posted by GitBox <gi...@apache.org>.
kw2542 commented on pull request #15421:
URL: https://github.com/apache/beam/pull/15421#issuecomment-912078250


   Run Samza ValidatesRunner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kw2542 commented on pull request #15421: [BEAM-12823] TestStream Support in Samza Portable Runner

Posted by GitBox <gi...@apache.org>.
kw2542 commented on pull request #15421:
URL: https://github.com/apache/beam/pull/15421#issuecomment-909546088


   Run Java Samza PortableValidatesRunner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kw2542 commented on pull request #15421: [BEAM-12823] TestStream Support in Samza Portable Runner

Posted by GitBox <gi...@apache.org>.
kw2542 commented on pull request #15421:
URL: https://github.com/apache/beam/pull/15421#issuecomment-909789409


   @xinyuiscool @Zhangyx39 Could you help take a look ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kw2542 commented on pull request #15421: [BEAM-12823] TestStream Support in Samza Portable Runner

Posted by GitBox <gi...@apache.org>.
kw2542 commented on pull request #15421:
URL: https://github.com/apache/beam/pull/15421#issuecomment-910737448






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kw2542 commented on pull request #15421: [BEAM-12823] TestStream Support in Samza Portable Runner

Posted by GitBox <gi...@apache.org>.
kw2542 commented on pull request #15421:
URL: https://github.com/apache/beam/pull/15421#issuecomment-909546088






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kw2542 removed a comment on pull request #15421: [BEAM-12823] TestStream Support in Samza Portable Runner

Posted by GitBox <gi...@apache.org>.
kw2542 removed a comment on pull request #15421:
URL: https://github.com/apache/beam/pull/15421#issuecomment-908555621






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] xinyuiscool commented on a change in pull request #15421: [BEAM-12823] TestStream Support in Samza Portable Runner

Posted by GitBox <gi...@apache.org>.
xinyuiscool commented on a change in pull request #15421:
URL: https://github.com/apache/beam/pull/15421#discussion_r701405816



##########
File path: runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaTestStreamTranslator.java
##########
@@ -90,11 +97,54 @@ public void translate(
     ctx.registerInputMessageStream(output, inputDescriptor);
   }
 
+  @SuppressWarnings("unchecked")
   @Override
   public void translatePortable(
       PipelineNode.PTransformNode transform,
       QueryablePipeline pipeline,
       PortableTranslationContext ctx) {
-    throw new SamzaException("TestStream is not supported in portable by Samza runner");
+    final ByteString bytes = transform.getTransform().getSpec().getPayload();
+    final Coder<T> coder;

Review comment:
       Shall we wrap line 107 - 129 into a private static method, e.g. createTestStreamDecoder()? The good thing about static is that your lambda function instance won't hold on to the outer class instance.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kw2542 commented on pull request #15421: [BEAM-12823] TestStream Support in Samza Portable Runner

Posted by GitBox <gi...@apache.org>.
kw2542 commented on pull request #15421:
URL: https://github.com/apache/beam/pull/15421#issuecomment-909788853


   Run XVR_Samza PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kw2542 commented on pull request #15421: [BEAM-12823] TestStream Support in Samza Portable Runner

Posted by GitBox <gi...@apache.org>.
kw2542 commented on pull request #15421:
URL: https://github.com/apache/beam/pull/15421#issuecomment-909598547






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kw2542 commented on pull request #15421: [BEAM-12823] TestStream Support in Samza Portable Runner

Posted by GitBox <gi...@apache.org>.
kw2542 commented on pull request #15421:
URL: https://github.com/apache/beam/pull/15421#issuecomment-910733697


   Cross Language Tests failed because of BEAM-12805


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kw2542 commented on pull request #15421: [BEAM-12823] TestStream Support in Samza Portable Runner

Posted by GitBox <gi...@apache.org>.
kw2542 commented on pull request #15421:
URL: https://github.com/apache/beam/pull/15421#issuecomment-910492717


   Samza CrossLanguageValidatesRunner Tests & Python Samza ValidatesRunner Tests are currently failing on master as well:
   
   - https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/lastCompletedBuild/
   - https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/lastCompletedBuild/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] xinyuiscool merged pull request #15421: [BEAM-12823] TestStream Support in Samza Portable Runner

Posted by GitBox <gi...@apache.org>.
xinyuiscool merged pull request #15421:
URL: https://github.com/apache/beam/pull/15421


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kw2542 commented on pull request #15421: [BEAM-12823] TestStream Support in Samza Portable Runner

Posted by GitBox <gi...@apache.org>.
kw2542 commented on pull request #15421:
URL: https://github.com/apache/beam/pull/15421#issuecomment-909657251


   Run Portable_Python PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kw2542 removed a comment on pull request #15421: [BEAM-12823] TestStream Support in Samza Portable Runner

Posted by GitBox <gi...@apache.org>.
kw2542 removed a comment on pull request #15421:
URL: https://github.com/apache/beam/pull/15421#issuecomment-908555621


   Run Java Samza PortableValidatesRunner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kw2542 commented on pull request #15421: [BEAM-12823] TestStream Support in Samza Portable Runner

Posted by GitBox <gi...@apache.org>.
kw2542 commented on pull request #15421:
URL: https://github.com/apache/beam/pull/15421#issuecomment-912077957


   Run Java Samza PortableValidatesRunner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] xinyuiscool commented on a change in pull request #15421: [BEAM-12823] TestStream Support in Samza Portable Runner

Posted by GitBox <gi...@apache.org>.
xinyuiscool commented on a change in pull request #15421:
URL: https://github.com/apache/beam/pull/15421#discussion_r701258082



##########
File path: runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaTestStreamTranslator.java
##########
@@ -90,11 +95,53 @@ public void translate(
     ctx.registerInputMessageStream(output, inputDescriptor);
   }
 
+  @SuppressWarnings("unchecked")
   @Override
   public void translatePortable(
       PipelineNode.PTransformNode transform,
       QueryablePipeline pipeline,
       PortableTranslationContext ctx) {
-    throw new SamzaException("TestStream is not supported in portable by Samza runner");
+    final String outputId = ctx.getOutputId(transform);
+    final String escapedOutputId = SamzaPipelineTranslatorUtils.escape(outputId);
+    final GenericSystemDescriptor systemDescriptor =
+        new GenericSystemDescriptor(escapedOutputId, SamzaTestStreamSystemFactory.class.getName());
+    final ByteString bytes = transform.getTransform().getSpec().getPayload();
+    final RunnerApi.TestStreamPayload payload;
+    final Coder<T> coder;
+
+    try {
+      payload = RunnerApi.TestStreamPayload.parseFrom(bytes);
+      coder =
+          (Coder<T>)
+              RehydratedComponents.forComponents(pipeline.getComponents())
+                  .getCoder(payload.getCoderId());
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    // the decoder for encodedTestStream
+    SerializableFunction<String, TestStream<?>> testStreamDecoder =
+        string -> {
+          try {
+            return TestStreamTranslation.testStreamFromProtoPayload(payload, coder);
+          } catch (IOException e) {
+            throw new SamzaException("Could not decode TestStream.", e);
+          }
+        };
+
+    final Map<String, String> systemConfig =
+        ImmutableMap.of(
+            "encodedTestStream",
+            Base64Serializer.serializeUnchecked(bytes),

Review comment:
       Is this being used? Seems the decoder function is already serialized with payload and actual coder, so not sure whether we need to pass in it here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kw2542 commented on pull request #15421: [BEAM-12823] TestStream Support in Samza Portable Runner

Posted by GitBox <gi...@apache.org>.
kw2542 commented on pull request #15421:
URL: https://github.com/apache/beam/pull/15421#issuecomment-909657159


   Run Python_PVR_Flink PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kw2542 commented on pull request #15421: [BEAM-12823] TestStream Support in Samza Portable Runner

Posted by GitBox <gi...@apache.org>.
kw2542 commented on pull request #15421:
URL: https://github.com/apache/beam/pull/15421#issuecomment-912096539


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kw2542 removed a comment on pull request #15421: [BEAM-12823] TestStream Support in Samza Portable Runner

Posted by GitBox <gi...@apache.org>.
kw2542 removed a comment on pull request #15421:
URL: https://github.com/apache/beam/pull/15421#issuecomment-908840367


   Run Java Samza PortableValidatesRunner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] xinyuiscool commented on a change in pull request #15421: [BEAM-12823] TestStream Support in Samza Portable Runner

Posted by GitBox <gi...@apache.org>.
xinyuiscool commented on a change in pull request #15421:
URL: https://github.com/apache/beam/pull/15421#discussion_r701259003



##########
File path: runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaTestStreamTranslator.java
##########
@@ -90,11 +95,53 @@ public void translate(
     ctx.registerInputMessageStream(output, inputDescriptor);
   }
 
+  @SuppressWarnings("unchecked")
   @Override
   public void translatePortable(
       PipelineNode.PTransformNode transform,
       QueryablePipeline pipeline,
       PortableTranslationContext ctx) {
-    throw new SamzaException("TestStream is not supported in portable by Samza runner");
+    final String outputId = ctx.getOutputId(transform);
+    final String escapedOutputId = SamzaPipelineTranslatorUtils.escape(outputId);
+    final GenericSystemDescriptor systemDescriptor =
+        new GenericSystemDescriptor(escapedOutputId, SamzaTestStreamSystemFactory.class.getName());
+    final ByteString bytes = transform.getTransform().getSpec().getPayload();
+    final RunnerApi.TestStreamPayload payload;
+    final Coder<T> coder;
+
+    try {
+      payload = RunnerApi.TestStreamPayload.parseFrom(bytes);
+      coder =
+          (Coder<T>)
+              RehydratedComponents.forComponents(pipeline.getComponents())
+                  .getCoder(payload.getCoderId());
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    // the decoder for encodedTestStream
+    SerializableFunction<String, TestStream<?>> testStreamDecoder =
+        string -> {

Review comment:
       string seems not used?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org