You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2023/02/20 00:57:22 UTC

[beam] branch prism-jobservices created (now da308835076)

This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a change to branch prism-jobservices
in repository https://gitbox.apache.org/repos/asf/beam.git


      at da308835076 [prism] minimum required job services

This branch includes the following new commits:

     new 0dc99edc01c Stop use of self hosted runners for some workflows. (#25542)
     new b24e3850100 Run prbot updates on hosted runners (#25544)
     new 03e12fb9d82 prism-fixstatic (#25546)
     new efc1d3629c2 Update Go SDK minimum Go version to 1.19 (#25545)
     new 2cef59549dd Replace more uses of `ClassLoadingStrategy.Default.INJECTION` (#23210)
     new 0dd240529d1 Use WindowedValue.withValue on hot paths #21250 (#25519)
     new 3e321d15934 Stop paying the iterator object creation tax in MultiplexingMetricTrackingFnDataReceiver (#25540)
     new 9c6eb255411 [prism] add windowing strategy (#25518)
     new d7e87949649 Fix Tensorflow intergration test model path (#25553)
     new 1294ed9ba73 [Go SDK]: Retrieve file size in CreateInitialRestriction in textio.Read (#25535)
     new 7bb493edde2 [#24789][prism] internal/worker + tentative data (#25478)
     new 2cb402ad92a [#24789][prism] add preprocessor and test (#25520)
     new da308835076 [prism] minimum required job services

The 13 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[beam] 06/13: Use WindowedValue.withValue on hot paths #21250 (#25519)

Posted by lo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch prism-jobservices
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 0dd240529d120c49155a0eb88274a0e9f2047091
Author: Luke Cwik <lu...@gmail.com>
AuthorDate: Fri Feb 17 16:58:29 2023 -0800

    Use WindowedValue.withValue on hot paths #21250 (#25519)
    
    * Use WindowedValue.withValue on hot paths #21250
    
    This removed about half of the overhead for outputting a value in the common scenario where we are already using a valid timestamp (the input timestamp) and also that we can use the `withValue` hot path which is optimized for certain use cases (e.g. the globally windowed value case).
    
    Before:
    ```
    Benchmark                                Mode  Cnt     Score     Error  Units
    ProcessBundleBenchmark.testLargeBundle  thrpt   15  3616.761 ± 157.844  ops/s
    ```
    
    After:
    ```
    Benchmark                                Mode  Cnt     Score     Error  Units
    ProcessBundleBenchmark.testLargeBundle  thrpt   15  3666.889 ± 151.448  ops/s
    ```
    
    This is for #21250.
---
 .../apache/beam/fn/harness/FnApiDoFnRunner.java    | 257 +++++++++++++++++----
 1 file changed, 209 insertions(+), 48 deletions(-)

diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
index 0cfcb0a84f2..561bb0f39fd 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
@@ -2165,7 +2165,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
   }
 
   /** Provides arguments for a {@link DoFnInvoker} for a window observing method. */
-  private class WindowObservingProcessBundleContext extends ProcessBundleContextBase {
+  private abstract class WindowObservingProcessBundleContextBase extends ProcessBundleContextBase {
     @Override
     public BoundedWindow window() {
       return currentWindow;
@@ -2180,6 +2180,53 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
     public <T> T sideInput(PCollectionView<T> view) {
       return stateAccessor.get(view, currentWindow);
     }
+  }
+
+  private class WindowObservingProcessBundleContext
+      extends WindowObservingProcessBundleContextBase {
+
+    @Override
+    public void output(OutputT output) {
+      // Don't need to check timestamp since we can always output using the input timestamp.
+      outputTo(
+          mainOutputConsumer,
+          WindowedValue.of(
+              output, currentElement.getTimestamp(), currentWindow, currentElement.getPane()));
+    }
+
+    @Override
+    public <T> void output(TupleTag<T> tag, T output) {
+      FnDataReceiver<WindowedValue<T>> consumer =
+          (FnDataReceiver) localNameToConsumer.get(tag.getId());
+      if (consumer == null) {
+        throw new IllegalArgumentException(String.format("Unknown output tag %s", tag));
+      }
+      // Don't need to check timestamp since we can always output using the input timestamp.
+      outputTo(
+          consumer,
+          WindowedValue.of(
+              output, currentElement.getTimestamp(), currentWindow, currentElement.getPane()));
+    }
+
+    @Override
+    public void outputWithTimestamp(OutputT output, Instant timestamp) {
+      // TODO: Check that timestamp is valid once all runners can provide proper timestamps.
+      outputTo(
+          mainOutputConsumer,
+          WindowedValue.of(output, timestamp, currentWindow, currentElement.getPane()));
+    }
+
+    @Override
+    public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+      // TODO: Check that timestamp is valid once all runners can provide proper timestamps.
+      FnDataReceiver<WindowedValue<T>> consumer =
+          (FnDataReceiver) localNameToConsumer.get(tag.getId());
+      if (consumer == null) {
+        throw new IllegalArgumentException(String.format("Unknown output tag %s", tag));
+      }
+      outputTo(
+          consumer, WindowedValue.of(output, timestamp, currentWindow, currentElement.getPane()));
+    }
 
     @Override
     public State state(String stateId, boolean alwaysFetched) {
@@ -2232,37 +2279,62 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
           currentElement.getTimestamp(),
           currentElement.getPane());
     }
-
-    @Override
-    public void outputWithTimestamp(OutputT output, Instant timestamp) {
-      // TODO: Check that timestamp is valid once all runners can provide proper timestamps.
-      outputTo(
-          mainOutputConsumer,
-          WindowedValue.of(output, timestamp, currentWindow, currentElement.getPane()));
-    }
-
-    @Override
-    public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
-      // TODO: Check that timestamp is valid once all runners can provide proper timestamps.
-      FnDataReceiver<WindowedValue<T>> consumer =
-          (FnDataReceiver) localNameToConsumer.get(tag.getId());
-      if (consumer == null) {
-        throw new IllegalArgumentException(String.format("Unknown output tag %s", tag));
-      }
-      outputTo(
-          consumer, WindowedValue.of(output, timestamp, currentWindow, currentElement.getPane()));
-    }
   }
 
   /** This context outputs KV<KV<Element, KV<Restriction, WatemarkEstimatorState>>, Size>. */
   private class SizedRestrictionWindowObservingProcessBundleContext
-      extends WindowObservingProcessBundleContext {
+      extends WindowObservingProcessBundleContextBase {
     private final String errorContextPrefix;
 
     SizedRestrictionWindowObservingProcessBundleContext(String errorContextPrefix) {
       this.errorContextPrefix = errorContextPrefix;
     }
 
+    @Override
+    // OutputT == RestrictionT
+    public void output(OutputT output) {
+      double size =
+          doFnInvoker.invokeGetSize(
+              new DelegatingArgumentProvider<InputT, OutputT>(
+                  this, this.errorContextPrefix + "/GetSize") {
+                @Override
+                public Object restriction() {
+                  return output;
+                }
+
+                @Override
+                public Instant timestamp(DoFn<InputT, OutputT> doFn) {
+                  return currentElement.getTimestamp();
+                }
+
+                @Override
+                public RestrictionTracker<?, ?> restrictionTracker() {
+                  return doFnInvoker.invokeNewTracker(this);
+                }
+              });
+
+      // Don't need to check timestamp since we can always output using the input timestamp.
+      outputTo(
+          mainOutputConsumer,
+          (WindowedValue<OutputT>)
+              WindowedValue.of(
+                  KV.of(
+                      KV.of(
+                          currentElement.getValue(), KV.of(output, currentWatermarkEstimatorState)),
+                      size),
+                  currentElement.getTimestamp(),
+                  currentWindow,
+                  currentElement.getPane()));
+    }
+
+    @Override
+    public <T> void output(TupleTag<T> tag, T output) {
+      // Note that the OutputReceiver/RowOutputReceiver specifically will use the non-tag versions
+      // of these methods when producing output.
+      throw new UnsupportedOperationException(
+          String.format("Non-main output %s unsupported in %s", tag, errorContextPrefix));
+    }
+
     @Override
     // OutputT == RestrictionT
     public void outputWithTimestamp(OutputT output, Instant timestamp) {
@@ -2299,17 +2371,85 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
                   currentWindow,
                   currentElement.getPane()));
     }
+
+    @Override
+    public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+      // Note that the OutputReceiver/RowOutputReceiver specifically will use the non-tag versions
+      // of these methods when producing output.
+      throw new UnsupportedOperationException(
+          String.format("Non-main output %s unsupported in %s", tag, errorContextPrefix));
+    }
+
+    @Override
+    public State state(String stateId, boolean alwaysFetched) {
+      throw new UnsupportedOperationException(
+          String.format("State unsupported in %s", errorContextPrefix));
+    }
+
+    @Override
+    public org.apache.beam.sdk.state.Timer timer(String timerId) {
+      throw new UnsupportedOperationException(
+          String.format("Timer unsupported in %s", errorContextPrefix));
+    }
+
+    @Override
+    public TimerMap timerFamily(String tagId) {
+      throw new UnsupportedOperationException(
+          String.format("Timer unsupported in %s", errorContextPrefix));
+    }
   }
 
   /** This context outputs KV<KV<Element, KV<Restriction, WatermarkEstimatorState>>, Size>. */
   private class SizedRestrictionNonWindowObservingProcessBundleContext
-      extends NonWindowObservingProcessBundleContext {
+      extends NonWindowObservingProcessBundleContextBase {
     private final String errorContextPrefix;
 
     SizedRestrictionNonWindowObservingProcessBundleContext(String errorContextPrefix) {
       this.errorContextPrefix = errorContextPrefix;
     }
 
+    @Override
+    // OutputT == RestrictionT
+    public void output(OutputT output) {
+      double size =
+          doFnInvoker.invokeGetSize(
+              new DelegatingArgumentProvider<InputT, OutputT>(
+                  this, errorContextPrefix + "/GetSize") {
+                @Override
+                public Object restriction() {
+                  return output;
+                }
+
+                @Override
+                public Instant timestamp(DoFn<InputT, OutputT> doFn) {
+                  return currentElement.getTimestamp();
+                }
+
+                @Override
+                public RestrictionTracker<?, ?> restrictionTracker() {
+                  return doFnInvoker.invokeNewTracker(this);
+                }
+              });
+
+      // Don't need to check timestamp since we can always output using the input timestamp.
+      outputTo(
+          mainOutputConsumer,
+          (WindowedValue<OutputT>)
+              currentElement.withValue(
+                  KV.of(
+                      KV.of(
+                          currentElement.getValue(), KV.of(output, currentWatermarkEstimatorState)),
+                      size)));
+    }
+
+    @Override
+    public <T> void output(TupleTag<T> tag, T output) {
+      // Note that the OutputReceiver/RowOutputReceiver specifically will use the non-tag versions
+      // of these methods when producing output.
+      throw new UnsupportedOperationException(
+          String.format("Non-main output %s unsupported in %s", tag, errorContextPrefix));
+    }
+
     @Override
     // OutputT == RestrictionT
     public void outputWithTimestamp(OutputT output, Instant timestamp) {
@@ -2346,10 +2486,37 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
                   currentElement.getWindows(),
                   currentElement.getPane()));
     }
+
+    @Override
+    public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+      // Note that the OutputReceiver/RowOutputReceiver specifically will use the non-tag versions
+      // of these methods when producing output.
+      throw new UnsupportedOperationException(
+          String.format("Non-main output %s unsupported in %s", tag, errorContextPrefix));
+    }
   }
 
   /** Provides arguments for a {@link DoFnInvoker} for a non-window observing method. */
-  private class NonWindowObservingProcessBundleContext extends ProcessBundleContextBase {
+  private class NonWindowObservingProcessBundleContext
+      extends NonWindowObservingProcessBundleContextBase {
+
+    @Override
+    public void output(OutputT output) {
+      // Don't need to check timestamp since we can always output using the input timestamp.
+      outputTo(mainOutputConsumer, currentElement.withValue(output));
+    }
+
+    @Override
+    public <T> void output(TupleTag<T> tag, T output) {
+      FnDataReceiver<WindowedValue<T>> consumer =
+          (FnDataReceiver) localNameToConsumer.get(tag.getId());
+      if (consumer == null) {
+        throw new IllegalArgumentException(String.format("Unknown output tag %s", tag));
+      }
+      // Don't need to check timestamp since we can always output using the input timestamp.
+      outputTo(consumer, currentElement.withValue(output));
+    }
+
     @Override
     public void outputWithTimestamp(OutputT output, Instant timestamp) {
       checkTimestamp(timestamp);
@@ -2372,7 +2539,11 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
           WindowedValue.of(
               output, timestamp, currentElement.getWindows(), currentElement.getPane()));
     }
+  }
 
+  /** Provides base arguments for a {@link DoFnInvoker} for a non-window observing method. */
+  private abstract class NonWindowObservingProcessBundleContextBase
+      extends ProcessBundleContextBase {
     @Override
     public BoundedWindow window() {
       throw new UnsupportedOperationException(
@@ -2489,8 +2660,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
 
               @Override
               public void output(Row output) {
-                ProcessBundleContextBase.this.outputWithTimestamp(
-                    fromRowFunction.apply(output), currentElement.getTimestamp());
+                ProcessBundleContextBase.this.output(fromRowFunction.apply(output));
               }
 
               @Override
@@ -2517,14 +2687,16 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
           private final Map<TupleTag<?>, OutputReceiver<Row>> taggedRowReceivers = new HashMap<>();
 
           private <T> OutputReceiver<T> createTaggedOutputReceiver(TupleTag<T> tag) {
+            // Note that it is important that we use the non-tag versions here when using the main
+            // output tag for performance reasons and we also rely on it for the splittable DoFn
+            // context objects as well.
             if (tag == null || mainOutputTag.equals(tag)) {
               return (OutputReceiver<T>) ProcessBundleContextBase.this;
             }
             return new OutputReceiver<T>() {
               @Override
               public void output(T output) {
-                ProcessBundleContextBase.this.outputWithTimestamp(
-                    tag, output, currentElement.getTimestamp());
+                ProcessBundleContextBase.this.output(tag, output);
               }
 
               @Override
@@ -2535,6 +2707,9 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
           }
 
           private <T> OutputReceiver<Row> createTaggedRowReceiver(TupleTag<T> tag) {
+            // Note that it is important that we use the non-tag versions here when using the main
+            // output tag for performance reasons and we also rely on it for the splittable DoFn
+            // context objects as well.
             if (tag == null || mainOutputTag.equals(tag)) {
               checkState(
                   mainOutputSchemaCoder != null,
@@ -2555,8 +2730,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
 
               @Override
               public void output(Row output) {
-                ProcessBundleContextBase.this.outputWithTimestamp(
-                    tag, fromRowFunction.apply(output), currentElement.getTimestamp());
+                ProcessBundleContextBase.this.output(tag, fromRowFunction.apply(output));
               }
 
               @Override
@@ -2615,16 +2789,6 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
       return pipelineOptions;
     }
 
-    @Override
-    public void output(OutputT output) {
-      outputWithTimestamp(output, currentElement.getTimestamp());
-    }
-
-    @Override
-    public <T> void output(TupleTag<T> tag, T output) {
-      outputWithTimestamp(tag, output, currentElement.getTimestamp());
-    }
-
     @Override
     public InputT element() {
       return currentElement.getValue();
@@ -2777,8 +2941,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
 
               @Override
               public void output(Row output) {
-                context.outputWithTimestamp(
-                    fromRowFunction.apply(output), currentElement.getTimestamp());
+                context.output(fromRowFunction.apply(output));
               }
 
               @Override
@@ -2810,7 +2973,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
             return new OutputReceiver<T>() {
               @Override
               public void output(T output) {
-                context.outputWithTimestamp(tag, output, currentElement.getTimestamp());
+                context.output(tag, output);
               }
 
               @Override
@@ -2841,8 +3004,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
 
               @Override
               public void output(Row output) {
-                context.outputWithTimestamp(
-                    tag, fromRowFunction.apply(output), currentElement.getTimestamp());
+                context.output(tag, fromRowFunction.apply(output));
               }
 
               @Override
@@ -3071,7 +3233,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
             return new OutputReceiver<T>() {
               @Override
               public void output(T output) {
-                context.outputWithTimestamp(tag, output, currentElement.getTimestamp());
+                context.output(tag, output);
               }
 
               @Override
@@ -3102,8 +3264,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
 
               @Override
               public void output(Row output) {
-                context.outputWithTimestamp(
-                    tag, fromRowFunction.apply(output), currentElement.getTimestamp());
+                context.output(tag, fromRowFunction.apply(output));
               }
 
               @Override


[beam] 04/13: Update Go SDK minimum Go version to 1.19 (#25545)

Posted by lo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch prism-jobservices
in repository https://gitbox.apache.org/repos/asf/beam.git

commit efc1d3629c29431338c482e448a743d7c20d9842
Author: Robert Burke <lo...@users.noreply.github.com>
AuthorDate: Fri Feb 17 14:47:32 2023 -0800

    Update Go SDK minimum Go version to 1.19 (#25545)
    
    * Update Go SDK version to 1.19
    
    * update CHANGES.md
    
    * Update docs.
    
    * staticheckfix
    
    ---------
    
    Co-authored-by: lostluck <13...@users.noreply.github.com>
---
 .github/actions/setup-self-hosted-action/action.yml               | 8 ++++----
 .github/workflows/build_playground_frontend.yml                   | 2 +-
 .github/workflows/go_tests.yml                                    | 2 +-
 .github/workflows/playground_deploy_backend.yml                   | 2 +-
 .github/workflows/python_tests.yml                                | 2 +-
 CHANGES.md                                                        | 1 +
 .../main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy    | 2 +-
 dev-support/docker/Dockerfile                                     | 2 +-
 sdks/go.mod                                                       | 2 +-
 website/www/site/content/en/documentation/programming-guide.md    | 2 +-
 website/www/site/content/en/get-started/quickstart-go.md          | 2 +-
 11 files changed, 14 insertions(+), 13 deletions(-)

diff --git a/.github/actions/setup-self-hosted-action/action.yml b/.github/actions/setup-self-hosted-action/action.yml
index 27469359e26..430a7e1d828 100644
--- a/.github/actions/setup-self-hosted-action/action.yml
+++ b/.github/actions/setup-self-hosted-action/action.yml
@@ -34,9 +34,9 @@ inputs:
     required: false
     description: 'Set as false if does not require java-8 setup'
     default: 'true'
-  requires-go-18:
+  requires-go-19:
     required: false
-    description: 'Set as false if does not require go-18 setup'
+    description: 'Set as false if does not require go-19 setup'
     default: 'true'
 
 runs:
@@ -64,7 +64,7 @@ runs:
         distribution: 'temurin'
         java-version: 8
     - name: Set Go Version
-      if: ${{ inputs.requires-go-18 == 'true'  }}
+      if: ${{ inputs.requires-go-19 == 'true'  }}
       uses: actions/setup-go@v3
       with:
-        go-version: '1.18.0'
+        go-version: '1.19.0'
diff --git a/.github/workflows/build_playground_frontend.yml b/.github/workflows/build_playground_frontend.yml
index d2f64dfddb2..26800b6cfec 100644
--- a/.github/workflows/build_playground_frontend.yml
+++ b/.github/workflows/build_playground_frontend.yml
@@ -34,7 +34,7 @@ jobs:
     name: Build Playground Frontend App
     runs-on: [self-hosted, ubuntu-20.04]
     env:
-      GO_VERSION: 1.18.0
+      GO_VERSION: 1.19.6
       BEAM_VERSION: 2.40.0
       TERRAFORM_VERSION: 1.0.9
       FLUTTER_VERSION: 3.3.2
diff --git a/.github/workflows/go_tests.yml b/.github/workflows/go_tests.yml
index fc772eb1ab2..49cbe902a7b 100644
--- a/.github/workflows/go_tests.yml
+++ b/.github/workflows/go_tests.yml
@@ -44,7 +44,7 @@ jobs:
           fetch-depth: 2
       - uses: actions/setup-go@v3
         with:
-          go-version: '1.18'
+          go-version: '1.19'
       - name: Delete old coverage
         run: "cd sdks/go/pkg && rm -rf .coverage || :"
       - name: Run coverage
diff --git a/.github/workflows/playground_deploy_backend.yml b/.github/workflows/playground_deploy_backend.yml
index b42db802778..02a4c799261 100644
--- a/.github/workflows/playground_deploy_backend.yml
+++ b/.github/workflows/playground_deploy_backend.yml
@@ -34,7 +34,7 @@ jobs:
     name: Build Playground Backend App
     runs-on: ubuntu-latest
     env:
-      GO_VERSION: 1.18.0
+      GO_VERSION: 1.19.6
       BEAM_VERSION: 2.40.0
       TERRAFORM_VERSION: 1.0.9
       STAND_SUFFIX: ''
diff --git a/.github/workflows/python_tests.yml b/.github/workflows/python_tests.yml
index 181a90e1da5..faffbea165f 100644
--- a/.github/workflows/python_tests.yml
+++ b/.github/workflows/python_tests.yml
@@ -179,7 +179,7 @@ jobs:
       - name: Install go
         uses: actions/setup-go@v3
         with:
-          go-version: '1.18'
+          go-version: '1.19'
       - name: Download source from artifacts
         uses: actions/download-artifact@v3
         with:
diff --git a/CHANGES.md b/CHANGES.md
index 252fba0ca04..e5c09743924 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -75,6 +75,7 @@
 * Add `WatchFilePattern` transform, which can be used as a side input to the RunInference PTransfrom to watch for model updates using a file pattern. ([#24042](https://github.com/apache/beam/issues/24042))
 * Add support for loading TorchScript models with `PytorchModelHandler`. The TorchScript model path can be
   passed to PytorchModelHandler using `torch_script_model_path=<path_to_model>`. ([#25321](https://github.com/apache/beam/pull/25321))
+* The Go SDK now requires Go 1.19 to build. ([#25545](https://github.com/apache/beam/pull/25545))
 
 ## Breaking Changes
 
diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index e44b1ff4cb0..7c04d81ef58 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -2015,7 +2015,7 @@ class BeamModulePlugin implements Plugin<Project> {
       def goRootDir = "${project.rootDir}/sdks/go"
 
       // This sets the whole project Go version.
-      project.ext.goVersion = "go1.19.3"
+      project.ext.goVersion = "go1.19.6"
 
       // Minor TODO: Figure out if we can pull out the GOCMD env variable after goPrepare script
       // completion, and avoid this GOBIN substitution.
diff --git a/dev-support/docker/Dockerfile b/dev-support/docker/Dockerfile
index 3e17c9c9677..1301baa041f 100644
--- a/dev-support/docker/Dockerfile
+++ b/dev-support/docker/Dockerfile
@@ -78,7 +78,7 @@ RUN pip3 install distlib==0.3.1 yapf==0.29.0 pytest
 ###
 # Install Go
 ###
-ENV DOWNLOAD_GO_VERSION=1.17.6
+ENV DOWNLOAD_GO_VERSION=1.19.6
 RUN wget https://golang.org/dl/go${DOWNLOAD_GO_VERSION}.linux-amd64.tar.gz && \
     tar -C /usr/local -xzf go${DOWNLOAD_GO_VERSION}.linux-amd64.tar.gz
 ENV GOROOT /usr/local/go
diff --git a/sdks/go.mod b/sdks/go.mod
index ec8a5852788..cd246981352 100644
--- a/sdks/go.mod
+++ b/sdks/go.mod
@@ -20,7 +20,7 @@
 // directory.
 module github.com/apache/beam/sdks/v2
 
-go 1.18
+go 1.19
 
 require (
 	cloud.google.com/go/bigquery v1.45.0
diff --git a/website/www/site/content/en/documentation/programming-guide.md b/website/www/site/content/en/documentation/programming-guide.md
index 08e47637634..eeda63211b6 100644
--- a/website/www/site/content/en/documentation/programming-guide.md
+++ b/website/www/site/content/en/documentation/programming-guide.md
@@ -39,7 +39,7 @@ The Python SDK supports Python 3.7, 3.8, 3.9, and 3.10.
 {{< /paragraph >}}
 
 {{< paragraph class="language-go">}}
-The Go SDK supports Go v1.18+. SDK release 2.32.0 is the last experimental version.
+The Go SDK supports Go v1.19+. SDK release 2.32.0 is the last experimental version.
 {{< /paragraph >}}
 
 {{< paragraph class="language-typescript">}}
diff --git a/website/www/site/content/en/get-started/quickstart-go.md b/website/www/site/content/en/get-started/quickstart-go.md
index 6c82f10e30f..c2504205aa6 100644
--- a/website/www/site/content/en/get-started/quickstart-go.md
+++ b/website/www/site/content/en/get-started/quickstart-go.md
@@ -25,7 +25,7 @@ If you're interested in contributing to the Apache Beam Go codebase, see the [Co
 
 ## Set up your environment
 
-The Beam SDK for Go requires `go` version 1.18 or newer. It can be downloaded [here](https://golang.org/). Check that you have version 1.18 by running:
+The Beam SDK for Go requires `go` version 1.19 or newer. It can be downloaded [here](https://golang.org/). Check that you have version 1.19 by running:
 
 {{< highlight >}}
 $ go version


[beam] 08/13: [prism] add windowing strategy (#25518)

Posted by lo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch prism-jobservices
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 9c6eb255411e3ff8ee433742425f8469e648818b
Author: Robert Burke <lo...@users.noreply.github.com>
AuthorDate: Fri Feb 17 19:18:07 2023 -0800

    [prism] add windowing strategy (#25518)
    
    Co-authored-by: lostluck <13...@users.noreply.github.com>
---
 .../beam/runners/prism/internal/engine/strategy.go | 50 ++++++++++++++++++++++
 .../runners/prism/internal/engine/strategy_test.go | 45 +++++++++++++++++++
 2 files changed, 95 insertions(+)

diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go b/sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go
new file mode 100644
index 00000000000..44e6064958c
--- /dev/null
+++ b/sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package engine
+
+import (
+	"fmt"
+	"time"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+)
+
+type winStrat interface {
+	EarliestCompletion(typex.Window) mtime.Time
+}
+
+type defaultStrat struct{}
+
+func (ws defaultStrat) EarliestCompletion(w typex.Window) mtime.Time {
+	return w.MaxTimestamp()
+}
+
+func (defaultStrat) String() string {
+	return "default"
+}
+
+type sessionStrat struct {
+	GapSize time.Duration
+}
+
+func (ws sessionStrat) EarliestCompletion(w typex.Window) mtime.Time {
+	return w.MaxTimestamp().Add(ws.GapSize)
+}
+
+func (ws sessionStrat) String() string {
+	return fmt.Sprintf("session[GapSize:%v]", ws.GapSize)
+}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/strategy_test.go b/sdks/go/pkg/beam/runners/prism/internal/engine/strategy_test.go
new file mode 100644
index 00000000000..9d558396f80
--- /dev/null
+++ b/sdks/go/pkg/beam/runners/prism/internal/engine/strategy_test.go
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package engine
+
+import (
+	"testing"
+	"time"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+)
+
+func TestEarliestCompletion(t *testing.T) {
+	tests := []struct {
+		strat winStrat
+		input typex.Window
+		want  mtime.Time
+	}{
+		{defaultStrat{}, window.GlobalWindow{}, mtime.EndOfGlobalWindowTime},
+		{defaultStrat{}, window.IntervalWindow{Start: 0, End: 4}, 3},
+		{defaultStrat{}, window.IntervalWindow{Start: mtime.MinTimestamp, End: mtime.MaxTimestamp}, mtime.MaxTimestamp - 1},
+		{sessionStrat{}, window.IntervalWindow{Start: 0, End: 4}, 3},
+		{sessionStrat{GapSize: 3 * time.Millisecond}, window.IntervalWindow{Start: 0, End: 4}, 6},
+	}
+
+	for _, test := range tests {
+		if got, want := test.strat.EarliestCompletion(test.input), test.want; got != want {
+			t.Errorf("%v.EarliestCompletion(%v)) = %v, want %v", test.strat, test.input, got, want)
+		}
+	}
+}


[beam] 07/13: Stop paying the iterator object creation tax in MultiplexingMetricTrackingFnDataReceiver (#25540)

Posted by lo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch prism-jobservices
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 3e321d159342b0edbf5911a18ed7dafc467041ca
Author: Luke Cwik <lu...@gmail.com>
AuthorDate: Fri Feb 17 16:59:05 2023 -0800

    Stop paying the iterator object creation tax in MultiplexingMetricTrackingFnDataReceiver (#25540)
    
    * Stop paying the iterator object creation tax in MultiplexingMetricTrackingFnDataReceiver
    
    This removes the overhead shown in https://user-images.githubusercontent.com/10078956/219762523-1e76e849-18b9-4c40-a513-000364baea52.png
    
    This is for #21250
---
 .../beam/fn/harness/data/PCollectionConsumerRegistry.java      | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)

diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java
index 45298a68d98..60b25d8b137 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java
@@ -48,7 +48,6 @@ import org.apache.beam.sdk.metrics.Distribution;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
 import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 
 /**
  * The {@code PCollectionConsumerRegistry} is used to maintain a collection of consuming
@@ -209,8 +208,7 @@ public class PCollectionConsumerRegistry {
             come up in the existing SDF expansion, but might be useful to support fused SDF nodes.
             This would require dedicated delivery of the split results to each of the consumers
             separately. */
-            return new MultiplexingMetricTrackingFnDataReceiver(
-                pcId, coder, ImmutableList.copyOf(consumerAndMetadatas));
+            return new MultiplexingMetricTrackingFnDataReceiver(pcId, coder, consumerAndMetadatas);
           }
         });
   }
@@ -351,8 +349,10 @@ public class PCollectionConsumerRegistry {
 
       // Use the ExecutionStateTracker and enter an appropriate state to track the
       // Process Bundle Execution time metric and also ensure user counters can get an appropriate
-      // metrics container.
-      for (ConsumerAndMetadata consumerAndMetadata : consumerAndMetadatas) {
+      // metrics container. We specifically don't use a for-each loop since it creates an iterator
+      // on a hot path.
+      for (int size = consumerAndMetadatas.size(), i = 0; i < size; ++i) {
+        ConsumerAndMetadata consumerAndMetadata = consumerAndMetadatas.get(i);
         ExecutionState state = consumerAndMetadata.getExecutionState();
         state.activate();
         try {


[beam] 11/13: [#24789][prism] internal/worker + tentative data (#25478)

Posted by lo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch prism-jobservices
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 7bb493edde20e00af8ac1fbff9214a0d54b3c1af
Author: Robert Burke <lo...@users.noreply.github.com>
AuthorDate: Sun Feb 19 16:33:51 2023 -0800

    [#24789][prism] internal/worker + tentative data (#25478)
    
    * [prism] internal/worker + tentative data
    
    * [prism] B method comments.
    
    * [prism] worker PR comments
    
    ---------
    
    Co-authored-by: lostluck <13...@users.noreply.github.com>
---
 .../pkg/beam/runners/prism/internal/engine/data.go |  30 ++
 .../beam/runners/prism/internal/worker/bundle.go   | 114 ++++++
 .../runners/prism/internal/worker/bundle_test.go   |  52 +++
 .../beam/runners/prism/internal/worker/worker.go   | 421 +++++++++++++++++++++
 .../runners/prism/internal/worker/worker_test.go   | 281 ++++++++++++++
 5 files changed, 898 insertions(+)

diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/data.go b/sdks/go/pkg/beam/runners/prism/internal/engine/data.go
new file mode 100644
index 00000000000..6fc192ac83b
--- /dev/null
+++ b/sdks/go/pkg/beam/runners/prism/internal/engine/data.go
@@ -0,0 +1,30 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package engine
+
+// TentativeData is where data for in progress bundles is put
+// until the bundle executes successfully.
+type TentativeData struct {
+	Raw map[string][][]byte
+}
+
+// WriteData adds data to a given global collectionID.
+func (d *TentativeData) WriteData(colID string, data []byte) {
+	if d.Raw == nil {
+		d.Raw = map[string][][]byte{}
+	}
+	d.Raw[colID] = append(d.Raw[colID], data)
+}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go b/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go
new file mode 100644
index 00000000000..f6fbf1293f4
--- /dev/null
+++ b/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go
@@ -0,0 +1,114 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package worker
+
+import (
+	"sync"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+	fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/engine"
+	"golang.org/x/exp/slog"
+)
+
+// B represents an extant ProcessBundle instruction sent to an SDK worker.
+// Generally manipulated by another package to interact with a worker.
+type B struct {
+	InstID string // ID for the instruction processing this bundle.
+	PBDID  string // ID for the ProcessBundleDescriptor
+
+	// InputTransformID is data being sent to the SDK.
+	InputTransformID string
+	InputData        [][]byte // Data specifically for this bundle.
+
+	// TODO change to a single map[tid] -> map[input] -> map[window] -> struct { Iter data, MultiMap data } instead of all maps.
+	// IterableSideInputData is a map from transformID, to inputID, to window, to data.
+	IterableSideInputData map[string]map[string]map[typex.Window][][]byte
+	// MultiMapSideInputData is a map from transformID, to inputID, to window, to data key, to data values.
+	MultiMapSideInputData map[string]map[string]map[typex.Window]map[string][][]byte
+
+	// OutputCount is the number of data outputs this bundle has.
+	// We need to see this many closed data channels before the bundle is complete.
+	OutputCount int
+	// dataWait is how we determine if a bundle is finished, by waiting for each of
+	// a Bundle's DataSinks to produce their last output.
+	// After this point we can "commit" the bundle's output for downstream use.
+	dataWait   sync.WaitGroup
+	OutputData engine.TentativeData
+	Resp       chan *fnpb.ProcessBundleResponse
+
+	SinkToPCollection map[string]string
+
+	// TODO: Metrics for this bundle, can be handled after the fact.
+}
+
+// Init initializes the bundle's internal state for waiting on all
+// data and for relaying a response back.
+func (b *B) Init() {
+	// We need to see final data signals that match the number of
+	// outputs the stage this bundle executes posesses
+	b.dataWait.Add(b.OutputCount)
+	b.Resp = make(chan *fnpb.ProcessBundleResponse, 1)
+}
+
+func (b *B) LogValue() slog.Value {
+	return slog.GroupValue(
+		slog.String("ID", b.InstID),
+		slog.String("stage", b.PBDID))
+}
+
+// ProcessOn executes the given bundle on the given W, blocking
+// until all data is complete.
+//
+// Assumes the bundle is initialized (all maps are non-nil, and data waitgroup is set, response channel initialized)
+// Assumes the bundle descriptor is already registered with the W.
+//
+// While this method mostly manipulates a W, putting it on a B avoids mixing the workers
+// public GRPC APIs up with local calls.
+func (b *B) ProcessOn(wk *W) {
+	wk.mu.Lock()
+	wk.bundles[b.InstID] = b
+	wk.mu.Unlock()
+
+	slog.Debug("processing", "bundle", b, "worker", wk)
+
+	// Tell the SDK to start processing the bundle.
+	wk.InstReqs <- &fnpb.InstructionRequest{
+		InstructionId: b.InstID,
+		Request: &fnpb.InstructionRequest_ProcessBundle{
+			ProcessBundle: &fnpb.ProcessBundleRequest{
+				ProcessBundleDescriptorId: b.PBDID,
+			},
+		},
+	}
+
+	// TODO: make batching decisions.
+	for i, d := range b.InputData {
+		wk.DataReqs <- &fnpb.Elements{
+			Data: []*fnpb.Elements_Data{
+				{
+					InstructionId: b.InstID,
+					TransformId:   b.InputTransformID,
+					Data:          d,
+					IsLast:        i+1 == len(b.InputData),
+				},
+			},
+		}
+	}
+
+	slog.Debug("waiting on data", "bundle", b)
+	b.dataWait.Wait() // Wait until data is ready.
+}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/bundle_test.go b/sdks/go/pkg/beam/runners/prism/internal/worker/bundle_test.go
new file mode 100644
index 00000000000..154306c3f6b
--- /dev/null
+++ b/sdks/go/pkg/beam/runners/prism/internal/worker/bundle_test.go
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package worker
+
+import (
+	"bytes"
+	"sync"
+	"testing"
+)
+
+func TestBundle_ProcessOn(t *testing.T) {
+	wk := New("test")
+	b := &B{
+		InstID:      "testInst",
+		PBDID:       "testPBDID",
+		OutputCount: 1,
+		InputData:   [][]byte{{1, 2, 3}},
+	}
+	b.Init()
+	var completed sync.WaitGroup
+	completed.Add(1)
+	go func() {
+		b.ProcessOn(wk)
+		completed.Done()
+	}()
+	b.dataWait.Done()
+	gotData := <-wk.DataReqs
+	if got, want := gotData.GetData()[0].GetData(), []byte{1, 2, 3}; !bytes.EqualFold(got, want) {
+		t.Errorf("ProcessOn(): data not sent; got %v, want %v", got, want)
+	}
+
+	gotInst := <-wk.InstReqs
+	if got, want := gotInst.GetInstructionId(), b.InstID; got != want {
+		t.Errorf("ProcessOn(): bad instruction ID; got %v, want %v", got, want)
+	}
+	if got, want := gotInst.GetProcessBundle().GetProcessBundleDescriptorId(), b.PBDID; got != want {
+		t.Errorf("ProcessOn(): bad process bundle descriptor ID; got %v, want %v", got, want)
+	}
+}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
new file mode 100644
index 00000000000..8458ce39e11
--- /dev/null
+++ b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
@@ -0,0 +1,421 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package worker handles interactions with SDK side workers, representing
+// the worker services, communicating with those services, and SDK environments.
+package worker
+
+import (
+	"bytes"
+	"context"
+	"fmt"
+	"io"
+	"net"
+	"sync"
+	"sync/atomic"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+	fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/engine"
+	"golang.org/x/exp/slog"
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
+	"google.golang.org/protobuf/encoding/prototext"
+)
+
+// A W manages worker environments, sending them work
+// that they're able to execute, and manages the server
+// side handlers for FnAPI RPCs.
+type W struct {
+	fnpb.UnimplementedBeamFnControlServer
+	fnpb.UnimplementedBeamFnDataServer
+	fnpb.UnimplementedBeamFnStateServer
+	fnpb.UnimplementedBeamFnLoggingServer
+
+	ID string
+
+	// Server management
+	lis    net.Listener
+	server *grpc.Server
+
+	// These are the ID sources
+	inst, bund uint64
+
+	InstReqs chan *fnpb.InstructionRequest
+	DataReqs chan *fnpb.Elements
+
+	mu          sync.Mutex
+	bundles     map[string]*B                            // Bundles keyed by InstructionID
+	Descriptors map[string]*fnpb.ProcessBundleDescriptor // Stages keyed by PBDID
+
+	D *DataService
+}
+
+// New starts the worker server components of FnAPI Execution.
+func New(id string) *W {
+	lis, err := net.Listen("tcp", ":0")
+	if err != nil {
+		panic(fmt.Sprintf("failed to listen: %v", err))
+	}
+	var opts []grpc.ServerOption
+	wk := &W{
+		ID:     id,
+		lis:    lis,
+		server: grpc.NewServer(opts...),
+
+		InstReqs: make(chan *fnpb.InstructionRequest, 10),
+		DataReqs: make(chan *fnpb.Elements, 10),
+
+		bundles:     make(map[string]*B),
+		Descriptors: make(map[string]*fnpb.ProcessBundleDescriptor),
+
+		D: &DataService{},
+	}
+	slog.Info("Serving Worker components", slog.String("endpoint", wk.Endpoint()))
+	fnpb.RegisterBeamFnControlServer(wk.server, wk)
+	fnpb.RegisterBeamFnDataServer(wk.server, wk)
+	fnpb.RegisterBeamFnLoggingServer(wk.server, wk)
+	fnpb.RegisterBeamFnStateServer(wk.server, wk)
+	return wk
+}
+
+func (wk *W) Endpoint() string {
+	return wk.lis.Addr().String()
+}
+
+// Serve serves on the started listener. Blocks.
+func (wk *W) Serve() {
+	wk.server.Serve(wk.lis)
+}
+
+func (wk *W) String() string {
+	return "worker[" + wk.ID + "]"
+}
+
+func (wk *W) LogValue() slog.Value {
+	return slog.GroupValue(
+		slog.String("ID", wk.ID),
+		slog.String("endpoint", wk.Endpoint()),
+	)
+}
+
+// Stop the GRPC server.
+func (wk *W) Stop() {
+	slog.Debug("stopping", "worker", wk)
+	close(wk.InstReqs)
+	close(wk.DataReqs)
+	wk.server.Stop()
+	wk.lis.Close()
+	slog.Debug("stopped", "worker", wk)
+}
+
+func (wk *W) NextInst() string {
+	return fmt.Sprintf("inst%03d", atomic.AddUint64(&wk.inst, 1))
+}
+
+func (wk *W) NextStage() string {
+	return fmt.Sprintf("stage%03d", atomic.AddUint64(&wk.bund, 1))
+}
+
+// TODO set logging level.
+var minsev = fnpb.LogEntry_Severity_DEBUG
+
+// Logging relates SDK worker messages back to the job that spawned them.
+// Messages are received from the SDK,
+func (wk *W) Logging(stream fnpb.BeamFnLogging_LoggingServer) error {
+	for {
+		in, err := stream.Recv()
+		if err == io.EOF {
+			return nil
+		}
+		if err != nil {
+			slog.Error("logging.Recv", err, "worker", wk)
+			return err
+		}
+		for _, l := range in.GetLogEntries() {
+			if l.Severity >= minsev {
+				// TODO: Connect to the associated Job for this worker instead of
+				// logging locally for SDK side logging.
+				slog.Log(toSlogSev(l.GetSeverity()), l.GetMessage(),
+					slog.String(slog.SourceKey, l.GetLogLocation()),
+					slog.Time(slog.TimeKey, l.GetTimestamp().AsTime()),
+					"worker", wk,
+				)
+			}
+		}
+	}
+}
+
+func toSlogSev(sev fnpb.LogEntry_Severity_Enum) slog.Level {
+	switch sev {
+	case fnpb.LogEntry_Severity_TRACE:
+		return slog.Level(-8)
+	case fnpb.LogEntry_Severity_DEBUG:
+		return slog.LevelDebug // -4
+	case fnpb.LogEntry_Severity_INFO:
+		return slog.LevelInfo // 0
+	case fnpb.LogEntry_Severity_NOTICE:
+		return slog.Level(2)
+	case fnpb.LogEntry_Severity_WARN:
+		return slog.LevelWarn // 4
+	case fnpb.LogEntry_Severity_ERROR:
+		return slog.LevelError // 8
+	case fnpb.LogEntry_Severity_CRITICAL:
+		return slog.Level(10)
+	}
+	return slog.LevelInfo
+}
+
+func (wk *W) GetProcessBundleDescriptor(ctx context.Context, req *fnpb.GetProcessBundleDescriptorRequest) (*fnpb.ProcessBundleDescriptor, error) {
+	desc, ok := wk.Descriptors[req.GetProcessBundleDescriptorId()]
+	if !ok {
+		return nil, fmt.Errorf("descriptor %v not found", req.GetProcessBundleDescriptorId())
+	}
+	return desc, nil
+}
+
+// Control relays instructions to SDKs and back again, coordinated via unique instructionIDs.
+//
+// Requests come from the runner, and are sent to the client in the SDK.
+func (wk *W) Control(ctrl fnpb.BeamFnControl_ControlServer) error {
+	done := make(chan bool)
+	go func() {
+		for {
+			resp, err := ctrl.Recv()
+			if err == io.EOF {
+				slog.Debug("ctrl.Recv finished; marking done", "worker", wk)
+				done <- true // means stream is finished
+				return
+			}
+			if err != nil {
+				switch status.Code(err) {
+				case codes.Canceled: // Might ignore this all the time instead.
+					slog.Error("ctrl.Recv Canceled", err, "worker", wk)
+					done <- true // means stream is finished
+					return
+				default:
+					slog.Error("ctrl.Recv failed", err, "worker", wk)
+					panic(err)
+				}
+			}
+
+			// TODO: Do more than assume these are ProcessBundleResponses.
+			wk.mu.Lock()
+			if b, ok := wk.bundles[resp.GetInstructionId()]; ok {
+				// TODO. Better pipeline error handling.
+				if resp.Error != "" {
+					slog.Log(slog.LevelError, "ctrl.Recv pipeline error", slog.ErrorKey, resp.GetError())
+					panic(resp.GetError())
+				}
+				b.Resp <- resp.GetProcessBundle()
+			} else {
+				slog.Debug("ctrl.Recv: %v", resp)
+			}
+			wk.mu.Unlock()
+		}
+	}()
+
+	for req := range wk.InstReqs {
+		ctrl.Send(req)
+	}
+	slog.Debug("ctrl.Send finished waiting on done")
+	<-done
+	slog.Debug("Control done")
+	return nil
+}
+
+// Data relays elements and timer bytes to SDKs and back again, coordinated via
+// ProcessBundle instructionIDs, and receiving input transforms.
+//
+// Data is multiplexed on a single stream for all active bundles on a worker.
+func (wk *W) Data(data fnpb.BeamFnData_DataServer) error {
+	go func() {
+		for {
+			resp, err := data.Recv()
+			if err == io.EOF {
+				return
+			}
+			if err != nil {
+				switch status.Code(err) {
+				case codes.Canceled:
+					slog.Error("data.Recv Canceled", err, "worker", wk)
+					return
+				default:
+					slog.Error("data.Recv failed", err, "worker", wk)
+					panic(err)
+				}
+			}
+			wk.mu.Lock()
+			for _, d := range resp.GetData() {
+				b, ok := wk.bundles[d.GetInstructionId()]
+				if !ok {
+					slog.Info("data.Recv for unknown bundle", "response", resp)
+					continue
+				}
+				colID := b.SinkToPCollection[d.GetTransformId()]
+
+				// There might not be data, eg. for side inputs, so we need to reconcile this elsewhere for
+				// downstream side inputs.
+				if len(d.GetData()) > 0 {
+					b.OutputData.WriteData(colID, d.GetData())
+				}
+				if d.GetIsLast() {
+					b.dataWait.Done()
+				}
+			}
+			wk.mu.Unlock()
+		}
+	}()
+
+	for req := range wk.DataReqs {
+		if err := data.Send(req); err != nil {
+			slog.Log(slog.LevelDebug, "data.Send error", slog.ErrorKey, err)
+		}
+	}
+	return nil
+}
+
+// State relays elements and timer bytes to SDKs and back again, coordinated via
+// ProcessBundle instructionIDs, and receiving input transforms.
+//
+// State requests come from SDKs, and the runner responds.
+func (wk *W) State(state fnpb.BeamFnState_StateServer) error {
+	responses := make(chan *fnpb.StateResponse)
+	go func() {
+		// This go routine creates all responses to state requests from the worker
+		// so we want to close the State handler when it's all done.
+		defer close(responses)
+		for {
+			req, err := state.Recv()
+			if err == io.EOF {
+				return
+			}
+			if err != nil {
+				switch status.Code(err) {
+				case codes.Canceled:
+					slog.Error("state.Recv Canceled", err, "worker", wk)
+					return
+				default:
+					slog.Error("state.Recv failed", err, "worker", wk)
+					panic(err)
+				}
+			}
+			switch req.GetRequest().(type) {
+			case *fnpb.StateRequest_Get:
+				// TODO: move data handling to be pcollection based.
+				b := wk.bundles[req.GetInstructionId()]
+				key := req.GetStateKey()
+				slog.Debug("StateRequest_Get", prototext.Format(req), "bundle", b)
+
+				var data [][]byte
+				switch key.GetType().(type) {
+				case *fnpb.StateKey_IterableSideInput_:
+					ikey := key.GetIterableSideInput()
+					wKey := ikey.GetWindow()
+					var w typex.Window
+					if len(wKey) == 0 {
+						w = window.GlobalWindow{}
+					} else {
+						w, err = exec.MakeWindowDecoder(coder.NewIntervalWindow()).DecodeSingle(bytes.NewBuffer(wKey))
+						if err != nil {
+							panic(fmt.Sprintf("error decoding iterable side input window key %v: %v", wKey, err))
+						}
+					}
+					winMap := b.IterableSideInputData[ikey.GetTransformId()][ikey.GetSideInputId()]
+					var wins []typex.Window
+					for w := range winMap {
+						wins = append(wins, w)
+					}
+					slog.Debug(fmt.Sprintf("side input[%v][%v] I Key: %v Windows: %v", req.GetId(), req.GetInstructionId(), w, wins))
+					data = winMap[w]
+
+				case *fnpb.StateKey_MultimapSideInput_:
+					mmkey := key.GetMultimapSideInput()
+					wKey := mmkey.GetWindow()
+					var w typex.Window
+					if len(wKey) == 0 {
+						w = window.GlobalWindow{}
+					} else {
+						w, err = exec.MakeWindowDecoder(coder.NewIntervalWindow()).DecodeSingle(bytes.NewBuffer(wKey))
+						if err != nil {
+							panic(fmt.Sprintf("error decoding iterable side input window key %v: %v", wKey, err))
+						}
+					}
+					dKey := mmkey.GetKey()
+					winMap := b.MultiMapSideInputData[mmkey.GetTransformId()][mmkey.GetSideInputId()]
+					var wins []typex.Window
+					for w := range winMap {
+						wins = append(wins, w)
+					}
+					slog.Debug(fmt.Sprintf("side input[%v][%v] MM Key: %v Windows: %v", req.GetId(), req.GetInstructionId(), w, wins))
+
+					data = winMap[w][string(dKey)]
+
+				default:
+					panic(fmt.Sprintf("unsupported StateKey Access type: %T: %v", key.GetType(), prototext.Format(key)))
+				}
+
+				// Encode the runner iterable (no length, just consecutive elements), and send it out.
+				// This is also where we can handle things like State Backed Iterables.
+				var buf bytes.Buffer
+				for _, value := range data {
+					buf.Write(value)
+				}
+				responses <- &fnpb.StateResponse{
+					Id: req.GetId(),
+					Response: &fnpb.StateResponse_Get{
+						Get: &fnpb.StateGetResponse{
+							Data: buf.Bytes(),
+						},
+					},
+				}
+			default:
+				panic(fmt.Sprintf("unsupported StateRequest kind %T: %v", req.GetRequest(), prototext.Format(req)))
+			}
+		}
+	}()
+	for resp := range responses {
+		if err := state.Send(resp); err != nil {
+			slog.Error("state.Send error", err)
+		}
+	}
+	return nil
+}
+
+// DataService is slated to be deleted in favour of stage based state
+// management for side inputs.
+type DataService struct {
+	// TODO actually quick process the data to windows here as well.
+	raw map[string][][]byte
+}
+
+// Commit tentative data to the datastore.
+func (d *DataService) Commit(tent engine.TentativeData) {
+	if d.raw == nil {
+		d.raw = map[string][][]byte{}
+	}
+	for colID, data := range tent.Raw {
+		d.raw[colID] = append(d.raw[colID], data...)
+	}
+}
+
+// GetAllData is a hack for Side Inputs until watermarks are sorted out.
+func (d *DataService) GetAllData(colID string) [][]byte {
+	return d.raw[colID]
+}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/worker_test.go b/sdks/go/pkg/beam/runners/prism/internal/worker/worker_test.go
new file mode 100644
index 00000000000..29b3fab92d6
--- /dev/null
+++ b/sdks/go/pkg/beam/runners/prism/internal/worker/worker_test.go
@@ -0,0 +1,281 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package worker
+
+import (
+	"bytes"
+	"context"
+	"net"
+	"sync"
+	"testing"
+	"time"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+	fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/credentials/insecure"
+	"google.golang.org/grpc/test/bufconn"
+)
+
+func TestWorker_New(t *testing.T) {
+	w := New("test")
+	if got, want := w.ID, "test"; got != want {
+		t.Errorf("New(%q) = %v, want %v", want, got, want)
+	}
+}
+
+func TestWorker_NextInst(t *testing.T) {
+	w := New("test")
+
+	instIDs := map[string]struct{}{}
+	for i := 0; i < 100; i++ {
+		instIDs[w.NextInst()] = struct{}{}
+	}
+	if got, want := len(instIDs), 100; got != want {
+		t.Errorf("calling w.NextInst() got %v unique ids, want %v", got, want)
+	}
+}
+
+func TestWorker_NextStage(t *testing.T) {
+	w := New("test")
+
+	stageIDs := map[string]struct{}{}
+	for i := 0; i < 100; i++ {
+		stageIDs[w.NextStage()] = struct{}{}
+	}
+	if got, want := len(stageIDs), 100; got != want {
+		t.Errorf("calling w.NextStage() got %v unique ids, want %v", got, want)
+	}
+}
+
+func TestWorker_GetProcessBundleDescriptor(t *testing.T) {
+	w := New("test")
+
+	id := "available"
+	w.Descriptors[id] = &fnpb.ProcessBundleDescriptor{
+		Id: id,
+	}
+
+	pbd, err := w.GetProcessBundleDescriptor(context.Background(), &fnpb.GetProcessBundleDescriptorRequest{
+		ProcessBundleDescriptorId: id,
+	})
+	if err != nil {
+		t.Errorf("got GetProcessBundleDescriptor(%q) error: %v, want nil", id, err)
+	}
+	if got, want := pbd.GetId(), id; got != want {
+		t.Errorf("got GetProcessBundleDescriptor(%q) = %v, want id %v", id, got, want)
+	}
+
+	pbd, err = w.GetProcessBundleDescriptor(context.Background(), &fnpb.GetProcessBundleDescriptorRequest{
+		ProcessBundleDescriptorId: "unknown",
+	})
+	if err == nil {
+		t.Errorf("got GetProcessBundleDescriptor(%q) = %v, want error", "unknown", pbd)
+	}
+}
+
+func serveTestWorker(t *testing.T) (context.Context, *W, *grpc.ClientConn) {
+	t.Helper()
+	ctx, cancelFn := context.WithCancel(context.Background())
+	t.Cleanup(cancelFn)
+
+	w := New("test")
+	lis := bufconn.Listen(2048)
+	w.lis = lis
+	t.Cleanup(func() { w.Stop() })
+	go w.Serve()
+
+	clientConn, err := grpc.DialContext(ctx, "", grpc.WithContextDialer(func(ctx context.Context, _ string) (net.Conn, error) {
+		return lis.DialContext(ctx)
+	}), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock())
+	if err != nil {
+		t.Fatal("couldn't create bufconn grpc connection:", err)
+	}
+	return ctx, w, clientConn
+}
+
+func TestWorker_Logging(t *testing.T) {
+	ctx, _, clientConn := serveTestWorker(t)
+
+	logCli := fnpb.NewBeamFnLoggingClient(clientConn)
+	logStream, err := logCli.Logging(ctx)
+	if err != nil {
+		t.Fatal("couldn't create log client:", err)
+	}
+
+	logStream.Send(&fnpb.LogEntry_List{
+		LogEntries: []*fnpb.LogEntry{{
+			Severity: fnpb.LogEntry_Severity_INFO,
+			Message:  "squeamish ossiphrage",
+		}},
+	})
+
+	// TODO: Connect to the job management service.
+	// At this point job messages are just logged to wherever the prism runner executes
+	// But this should pivot to anyone connecting to the Job Management service for the
+	// job.
+	// In the meantime, sleep to validate execution via coverage.
+	time.Sleep(20 * time.Millisecond)
+}
+
+func TestWorker_Control_HappyPath(t *testing.T) {
+	ctx, wk, clientConn := serveTestWorker(t)
+
+	ctrlCli := fnpb.NewBeamFnControlClient(clientConn)
+	ctrlStream, err := ctrlCli.Control(ctx)
+	if err != nil {
+		t.Fatal("couldn't create control client:", err)
+	}
+
+	instID := wk.NextInst()
+
+	b := &B{}
+	b.Init()
+	wk.bundles[instID] = b
+	b.ProcessOn(wk)
+
+	ctrlStream.Send(&fnpb.InstructionResponse{
+		InstructionId: instID,
+		Response: &fnpb.InstructionResponse_ProcessBundle{
+			ProcessBundle: &fnpb.ProcessBundleResponse{
+				RequiresFinalization: true, // Simple thing to check.
+			},
+		},
+	})
+
+	if err := ctrlStream.CloseSend(); err != nil {
+		t.Errorf("ctrlStream.CloseSend() = %v", err)
+	}
+	resp := <-b.Resp
+
+	if !resp.RequiresFinalization {
+		t.Errorf("got %v, want response that Requires Finalization", resp)
+	}
+}
+
+func TestWorker_Data_HappyPath(t *testing.T) {
+	ctx, wk, clientConn := serveTestWorker(t)
+
+	dataCli := fnpb.NewBeamFnDataClient(clientConn)
+	dataStream, err := dataCli.Data(ctx)
+	if err != nil {
+		t.Fatal("couldn't create data client:", err)
+	}
+
+	instID := wk.NextInst()
+
+	b := &B{
+		InstID: instID,
+		PBDID:  wk.NextStage(),
+		InputData: [][]byte{
+			{1, 1, 1, 1, 1, 1},
+		},
+		OutputCount: 1,
+	}
+	b.Init()
+	wk.bundles[instID] = b
+
+	var wg sync.WaitGroup
+	wg.Add(1)
+	go func() {
+		defer wg.Done()
+		b.ProcessOn(wk)
+	}()
+
+	wk.InstReqs <- &fnpb.InstructionRequest{
+		InstructionId: instID,
+	}
+
+	elements, err := dataStream.Recv()
+	if err != nil {
+		t.Fatal("couldn't receive data elements:", err)
+	}
+
+	if got, want := elements.GetData()[0].GetInstructionId(), b.InstID; got != want {
+		t.Fatalf("couldn't receive data elements ID: got %v, want %v", got, want)
+	}
+	if got, want := elements.GetData()[0].GetData(), []byte{1, 1, 1, 1, 1, 1}; !bytes.Equal(got, want) {
+		t.Fatalf("client Data received %v, want %v", got, want)
+	}
+	if got, want := elements.GetData()[0].GetIsLast(), true; got != want {
+		t.Fatalf("client Data received wasn't last: got %v, want %v", got, want)
+	}
+
+	dataStream.Send(elements)
+
+	if err := dataStream.CloseSend(); err != nil {
+		t.Errorf("ctrlStream.CloseSend() = %v", err)
+	}
+
+	wg.Wait()
+	t.Log("ProcessOn successfully exited")
+}
+
+func TestWorker_State_Iterable(t *testing.T) {
+	ctx, wk, clientConn := serveTestWorker(t)
+
+	stateCli := fnpb.NewBeamFnStateClient(clientConn)
+	stateStream, err := stateCli.State(ctx)
+	if err != nil {
+		t.Fatal("couldn't create state client:", err)
+	}
+
+	instID := wk.NextInst()
+	wk.bundles[instID] = &B{
+		IterableSideInputData: map[string]map[string]map[typex.Window][][]byte{
+			"transformID": {
+				"i1": {
+					window.GlobalWindow{}: [][]byte{
+						{42},
+					},
+				},
+			},
+		},
+	}
+
+	stateStream.Send(&fnpb.StateRequest{
+		Id:            "first",
+		InstructionId: instID,
+		Request: &fnpb.StateRequest_Get{
+			Get: &fnpb.StateGetRequest{},
+		},
+		StateKey: &fnpb.StateKey{Type: &fnpb.StateKey_IterableSideInput_{
+			IterableSideInput: &fnpb.StateKey_IterableSideInput{
+				TransformId: "transformID",
+				SideInputId: "i1",
+				Window:      []byte{}, // Global Windows
+			},
+		}},
+	})
+
+	resp, err := stateStream.Recv()
+	if err != nil {
+		t.Fatal("couldn't receive state response:", err)
+	}
+
+	if got, want := resp.GetId(), "first"; got != want {
+		t.Fatalf("didn't receive expected state response: got %v, want %v", got, want)
+	}
+
+	if got, want := resp.GetGet().GetData(), []byte{42}; !bytes.Equal(got, want) {
+		t.Fatalf("didn't receive expected state response data: got %v, want %v", got, want)
+	}
+
+	if err := stateStream.CloseSend(); err != nil {
+		t.Errorf("stateStream.CloseSend() = %v", err)
+	}
+}


[beam] 03/13: prism-fixstatic (#25546)

Posted by lo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch prism-jobservices
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 03e12fb9d827e32d54faa5484892cdc9e94dc817
Author: Robert Burke <lo...@users.noreply.github.com>
AuthorDate: Fri Feb 17 12:54:06 2023 -0800

    prism-fixstatic (#25546)
    
    Co-authored-by: lostluck <13...@users.noreply.github.com>
---
 sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics_test.go | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)

diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics_test.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics_test.go
index d06d6774828..e0346731f30 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics_test.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics_test.go
@@ -23,7 +23,6 @@ import (
 
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
 	fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
-	"github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
 	pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
 	"github.com/google/go-cmp/cmp"
 	"google.golang.org/protobuf/proto"
@@ -73,7 +72,7 @@ func Test_metricsStore_ContributeMetrics(t *testing.T) {
 		// TODO convert input to non-legacy metrics once we support, and then delete these.
 		input [][]*pipepb.MonitoringInfo
 
-		want []*pipeline_v1.MonitoringInfo
+		want []*pipepb.MonitoringInfo
 	}{
 		{
 			name: "int64Sum",


[beam] 05/13: Replace more uses of `ClassLoadingStrategy.Default.INJECTION` (#23210)

Posted by lo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch prism-jobservices
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 2cef59549dd8cf564fddfc2c8be0e8516ac96ddb
Author: Liam Miller-Cushon <cu...@google.com>
AuthorDate: Fri Feb 17 15:20:45 2023 -0800

    Replace more uses of `ClassLoadingStrategy.Default.INJECTION` (#23210)
    
    * Replace more uses of `ClassLoadingStrategy.Default.INJECTION`
    
    in preparation for Java 17+. Follow-up to:
    
    https://github.com/apache/beam/commit/98f1f75459ee300baa5574042149a65063239705.
    
    * Move helper method to a shared ByteBuddyUtils class
    
    * spotlessApply
    
    * Apply suggestions from code review
    
    Co-authored-by: Lukasz Cwik <lc...@google.com>
    
    * Fix nullness issues, add an overload that takes a custom classloader
    
    * spotless apply
    
    * Fix imports
    
    * Update sdks/java/core/src/main/java/org/apache/beam/sdk/util/ByteBuddyUtils.java
    
    Co-authored-by: Lukasz Cwik <lc...@google.com>
    
    * Use ISE instead of LinkageError
    
    * Add a missing import
    
    * s/getClassLoader/findClassLoader
    
    * fix Checker Framework error
    
    ---------
    
    Co-authored-by: Lukasz Cwik <lc...@google.com>
---
 .../apache/beam/sdk/coders/RowCoderGenerator.java  |  7 ++-
 .../beam/sdk/schemas/utils/AvroByteBuddyUtils.java |  5 +-
 .../beam/sdk/schemas/utils/ByteBuddyUtils.java     |  4 +-
 .../beam/sdk/schemas/utils/JavaBeanUtils.java      | 11 +++--
 .../apache/beam/sdk/schemas/utils/POJOUtils.java   | 10 ++--
 .../sdk/schemas/utils/SelectByteBuddyHelpers.java  |  7 ++-
 .../reflect/ByteBuddyOnTimerInvokerFactory.java    | 10 ++--
 .../org/apache/beam/sdk/util/ByteBuddyUtils.java   | 57 ++++++++++++++++++++++
 8 files changed, 87 insertions(+), 24 deletions(-)

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java
index 5ce7358d882..5a37a22e22d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.coders;
 
+import static org.apache.beam.sdk.util.ByteBuddyUtils.getClassLoadingStrategy;
 import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
 
 import java.io.IOException;
@@ -36,7 +37,6 @@ import net.bytebuddy.description.modifier.Visibility;
 import net.bytebuddy.description.type.TypeDescription;
 import net.bytebuddy.description.type.TypeDescription.ForLoadedType;
 import net.bytebuddy.dynamic.DynamicType;
-import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
 import net.bytebuddy.dynamic.scaffold.InstrumentedType;
 import net.bytebuddy.implementation.FixedValue;
 import net.bytebuddy.implementation.Implementation;
@@ -55,6 +55,7 @@ import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.Schema.Field;
 import org.apache.beam.sdk.schemas.Schema.FieldType;
 import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.util.common.ReflectHelpers;
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
@@ -172,7 +173,9 @@ public abstract class RowCoderGenerator {
         rowCoder =
             builder
                 .make()
-                .load(Coder.class.getClassLoader(), ClassLoadingStrategy.Default.INJECTION)
+                .load(
+                    ReflectHelpers.findClassLoader(schema.getClass().getClassLoader()),
+                    getClassLoadingStrategy(schema.getClass()))
                 .getLoaded()
                 .getDeclaredConstructor(Coder[].class, int[].class)
                 .newInstance((Object) componentCoders, (Object) encodingPosToRowIndex);
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroByteBuddyUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroByteBuddyUtils.java
index eb01b83c94f..4c9055ba245 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroByteBuddyUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroByteBuddyUtils.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.sdk.schemas.utils;
 
+import static org.apache.beam.sdk.util.ByteBuddyUtils.getClassLoadingStrategy;
+
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Type;
@@ -25,7 +27,6 @@ import net.bytebuddy.ByteBuddy;
 import net.bytebuddy.asm.AsmVisitorWrapper;
 import net.bytebuddy.description.type.TypeDescription.ForLoadedType;
 import net.bytebuddy.dynamic.DynamicType;
-import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
 import net.bytebuddy.implementation.MethodCall;
 import net.bytebuddy.implementation.bytecode.StackManipulation;
 import net.bytebuddy.implementation.bytecode.assign.TypeCasting;
@@ -99,7 +100,7 @@ class AvroByteBuddyUtils {
           .make()
           .load(
               ReflectHelpers.findClassLoader(clazz.getClassLoader()),
-              ClassLoadingStrategy.Default.INJECTION)
+              getClassLoadingStrategy(clazz))
           .getLoaded()
           .getDeclaredConstructor()
           .newInstance();
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ByteBuddyUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ByteBuddyUtils.java
index bb0003c4d4d..65eaffb9318 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ByteBuddyUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ByteBuddyUtils.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.schemas.utils;
 
+import static org.apache.beam.sdk.util.ByteBuddyUtils.getClassLoadingStrategy;
 import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
 
 import java.lang.reflect.Constructor;
@@ -43,7 +44,6 @@ import net.bytebuddy.description.method.MethodDescription.ForLoadedMethod;
 import net.bytebuddy.description.type.TypeDescription;
 import net.bytebuddy.description.type.TypeDescription.ForLoadedType;
 import net.bytebuddy.dynamic.DynamicType;
-import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
 import net.bytebuddy.dynamic.scaffold.InstrumentedType;
 import net.bytebuddy.implementation.Implementation;
 import net.bytebuddy.implementation.Implementation.Context;
@@ -459,7 +459,7 @@ public class ByteBuddyUtils {
         .make()
         .load(
             ReflectHelpers.findClassLoader(((Class) fromType).getClassLoader()),
-            ClassLoadingStrategy.Default.INJECTION)
+            getClassLoadingStrategy((Class) fromType))
         .getLoaded();
   }
 
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtils.java
index ec0dba2c942..49d30f0e345 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtils.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.sdk.schemas.utils;
 
+import static org.apache.beam.sdk.util.ByteBuddyUtils.getClassLoadingStrategy;
+
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
@@ -28,7 +30,6 @@ import net.bytebuddy.ByteBuddy;
 import net.bytebuddy.asm.AsmVisitorWrapper;
 import net.bytebuddy.description.method.MethodDescription.ForLoadedMethod;
 import net.bytebuddy.dynamic.DynamicType;
-import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
 import net.bytebuddy.dynamic.scaffold.InstrumentedType;
 import net.bytebuddy.implementation.FixedValue;
 import net.bytebuddy.implementation.Implementation;
@@ -162,7 +163,7 @@ public class JavaBeanUtils {
           .load(
               ReflectHelpers.findClassLoader(
                   typeInformation.getMethod().getDeclaringClass().getClassLoader()),
-              ClassLoadingStrategy.Default.INJECTION)
+              getClassLoadingStrategy(typeInformation.getMethod().getDeclaringClass()))
           .getLoaded()
           .getDeclaredConstructor()
           .newInstance();
@@ -226,7 +227,7 @@ public class JavaBeanUtils {
           .load(
               ReflectHelpers.findClassLoader(
                   typeInformation.getMethod().getDeclaringClass().getClassLoader()),
-              ClassLoadingStrategy.Default.INJECTION)
+              getClassLoadingStrategy(typeInformation.getMethod().getDeclaringClass()))
           .getLoaded()
           .getDeclaredConstructor()
           .newInstance();
@@ -290,7 +291,7 @@ public class JavaBeanUtils {
           .make()
           .load(
               ReflectHelpers.findClassLoader(clazz.getClassLoader()),
-              ClassLoadingStrategy.Default.INJECTION)
+              getClassLoadingStrategy(clazz))
           .getLoaded()
           .getDeclaredConstructor()
           .newInstance();
@@ -338,7 +339,7 @@ public class JavaBeanUtils {
           .make()
           .load(
               ReflectHelpers.findClassLoader(clazz.getClassLoader()),
-              ClassLoadingStrategy.Default.INJECTION)
+              getClassLoadingStrategy(clazz))
           .getLoaded()
           .getDeclaredConstructor()
           .newInstance();
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/POJOUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/POJOUtils.java
index 46970c6bc4f..47e2b4eec5f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/POJOUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/POJOUtils.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.sdk.schemas.utils;
 
+import static org.apache.beam.sdk.util.ByteBuddyUtils.getClassLoadingStrategy;
+
 import java.lang.reflect.Constructor;
 import java.lang.reflect.Field;
 import java.lang.reflect.InvocationTargetException;
@@ -156,7 +158,7 @@ public class POJOUtils {
           .make()
           .load(
               ReflectHelpers.findClassLoader(clazz.getClassLoader()),
-              ClassLoadingStrategy.Default.INJECTION)
+              getClassLoadingStrategy(clazz))
           .getLoaded()
           .getDeclaredConstructor()
           .newInstance();
@@ -208,7 +210,7 @@ public class POJOUtils {
           .make()
           .load(
               ReflectHelpers.findClassLoader(clazz.getClassLoader()),
-              ClassLoadingStrategy.Default.INJECTION)
+              getClassLoadingStrategy(clazz))
           .getLoaded()
           .getDeclaredConstructor()
           .newInstance();
@@ -299,7 +301,7 @@ public class POJOUtils {
           .make()
           .load(
               ReflectHelpers.findClassLoader(field.getDeclaringClass().getClassLoader()),
-              ClassLoadingStrategy.Default.INJECTION)
+              getClassLoadingStrategy(field.getDeclaringClass()))
           .getLoaded()
           .getDeclaredConstructor()
           .newInstance();
@@ -379,7 +381,7 @@ public class POJOUtils {
           .make()
           .load(
               ReflectHelpers.findClassLoader(field.getDeclaringClass().getClassLoader()),
-              ClassLoadingStrategy.Default.INJECTION)
+              getClassLoadingStrategy(field.getDeclaringClass()))
           .getLoaded()
           .getDeclaredConstructor()
           .newInstance();
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/SelectByteBuddyHelpers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/SelectByteBuddyHelpers.java
index fd3b3735ee5..ec3db9b3cce 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/SelectByteBuddyHelpers.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/SelectByteBuddyHelpers.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.schemas.utils;
 
+import static org.apache.beam.sdk.util.ByteBuddyUtils.getClassLoadingStrategy;
 import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.auto.value.AutoValue;
@@ -36,7 +37,6 @@ import net.bytebuddy.description.modifier.Visibility;
 import net.bytebuddy.description.type.TypeDescription.ForLoadedType;
 import net.bytebuddy.description.type.TypeDescription.Generic;
 import net.bytebuddy.dynamic.DynamicType;
-import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
 import net.bytebuddy.dynamic.scaffold.InstrumentedType;
 import net.bytebuddy.implementation.Implementation;
 import net.bytebuddy.implementation.Implementation.Context;
@@ -66,6 +66,7 @@ import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.Schema.FieldType;
 import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.IfNullElse;
 import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.ShortCircuitReturnNull;
+import org.apache.beam.sdk.util.common.ReflectHelpers;
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
@@ -177,7 +178,9 @@ class SelectByteBuddyHelpers {
       return builder
           .visit(new AsmVisitorWrapper.ForDeclaredMethods().writerFlags(ClassWriter.COMPUTE_FRAMES))
           .make()
-          .load(Row.class.getClassLoader(), ClassLoadingStrategy.Default.INJECTION)
+          .load(
+              ReflectHelpers.findClassLoader(schemaAndDescriptor.getClass().getClassLoader()),
+              getClassLoadingStrategy(schemaAndDescriptor.getClass()))
           .getLoaded()
           .getDeclaredConstructor(Schema.class)
           .newInstance(outputSchema);
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyOnTimerInvokerFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyOnTimerInvokerFactory.java
index 49affe696e4..09e6212fa66 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyOnTimerInvokerFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyOnTimerInvokerFactory.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.transforms.reflect;
 
+import static org.apache.beam.sdk.util.ByteBuddyUtils.getClassLoadingStrategy;
 import static org.apache.beam.sdk.util.common.ReflectHelpers.findClassLoader;
 
 import java.lang.reflect.Constructor;
@@ -27,7 +28,6 @@ import net.bytebuddy.description.modifier.FieldManifestation;
 import net.bytebuddy.description.modifier.Visibility;
 import net.bytebuddy.description.type.TypeDescription;
 import net.bytebuddy.dynamic.DynamicType;
-import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
 import net.bytebuddy.dynamic.scaffold.InstrumentedType;
 import net.bytebuddy.dynamic.scaffold.subclass.ConstructorStrategy;
 import net.bytebuddy.implementation.Implementation;
@@ -225,9 +225,7 @@ class ByteBuddyOnTimerInvokerFactory implements OnTimerInvokerFactory {
     Class<? extends OnTimerInvoker<?, ?>> res =
         (Class<? extends OnTimerInvoker<?, ?>>)
             unloaded
-                .load(
-                    findClassLoader(fnClass.getClassLoader()),
-                    ClassLoadingStrategy.Default.INJECTION)
+                .load(findClassLoader(fnClass.getClassLoader()), getClassLoadingStrategy(fnClass))
                 .getLoaded();
     return res;
   }
@@ -277,9 +275,7 @@ class ByteBuddyOnTimerInvokerFactory implements OnTimerInvokerFactory {
     Class<? extends OnTimerInvoker<?, ?>> res =
         (Class<? extends OnTimerInvoker<?, ?>>)
             unloaded
-                .load(
-                    findClassLoader(fnClass.getClassLoader()),
-                    ClassLoadingStrategy.Default.INJECTION)
+                .load(findClassLoader(fnClass.getClassLoader()), getClassLoadingStrategy(fnClass))
                 .getLoaded();
     return res;
   }
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ByteBuddyUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ByteBuddyUtils.java
new file mode 100644
index 00000000000..3a7dd889d95
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ByteBuddyUtils.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static java.util.Objects.requireNonNull;
+
+import java.lang.reflect.Method;
+import net.bytebuddy.dynamic.loading.ClassInjector;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import org.apache.beam.sdk.util.common.ReflectHelpers;
+
+/** Utilities for working with Byte Buddy. */
+public final class ByteBuddyUtils {
+  private ByteBuddyUtils() {} // Non-instantiable
+
+  /** Returns a class loading strategy that is compatible with Java 17+. */
+  public static ClassLoadingStrategy<ClassLoader> getClassLoadingStrategy(Class<?> targetClass) {
+    try {
+      ClassLoadingStrategy<ClassLoader> strategy;
+      if (ClassInjector.UsingLookup.isAvailable()) {
+        ClassLoader classLoader = ReflectHelpers.findClassLoader(targetClass);
+        Class<?> methodHandles = Class.forName("java.lang.invoke.MethodHandles", true, classLoader);
+        @SuppressWarnings("nullness") // MethodHandles#lookup accepts null
+        Object lookup = methodHandles.getMethod("lookup").invoke(null);
+        Class<?> lookupClass =
+            Class.forName("java.lang.invoke.MethodHandles$Lookup", true, classLoader);
+        Method privateLookupIn =
+            methodHandles.getMethod("privateLookupIn", Class.class, lookupClass);
+        @SuppressWarnings("nullness") // this is a static method, the receiver can be null
+        Object privateLookup = requireNonNull(privateLookupIn.invoke(null, targetClass, lookup));
+        strategy = ClassLoadingStrategy.UsingLookup.of(requireNonNull(privateLookup));
+      } else if (ClassInjector.UsingReflection.isAvailable()) {
+        strategy = ClassLoadingStrategy.Default.INJECTION;
+      } else {
+        throw new IllegalStateException("No code generation strategy available");
+      }
+      return strategy;
+    } catch (ReflectiveOperationException e) {
+      throw new IllegalStateException("No code generation strategy available", e);
+    }
+  }
+}


[beam] 10/13: [Go SDK]: Retrieve file size in CreateInitialRestriction in textio.Read (#25535)

Posted by lo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch prism-jobservices
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 1294ed9ba73f3262f294b8475652f1879f9a9e6b
Author: Johanna Öjeling <51...@users.noreply.github.com>
AuthorDate: Sun Feb 19 23:31:25 2023 +0100

    [Go SDK]: Retrieve file size in CreateInitialRestriction in textio.Read (#25535)
    
    * Retrieve file size in CreateInitialRestriction in textio.Read
    
    * Update readFn doc comment
---
 sdks/go/pkg/beam/io/textio/textio.go | 39 ++++++++++++++----------------------
 1 file changed, 15 insertions(+), 24 deletions(-)

diff --git a/sdks/go/pkg/beam/io/textio/textio.go b/sdks/go/pkg/beam/io/textio/textio.go
index a6f909aea1a..ed8be0a42b2 100644
--- a/sdks/go/pkg/beam/io/textio/textio.go
+++ b/sdks/go/pkg/beam/io/textio/textio.go
@@ -34,7 +34,6 @@ import (
 
 func init() {
 	beam.RegisterType(reflect.TypeOf((*readFn)(nil)).Elem())
-	beam.RegisterFunction(sizeFn)
 	beam.RegisterType(reflect.TypeOf((*writeFileFn)(nil)).Elem())
 	beam.RegisterFunction(expandFn)
 }
@@ -82,8 +81,7 @@ func ReadAllSdf(s beam.Scope, col beam.PCollection) beam.PCollection {
 // into separate bundles.
 func read(s beam.Scope, col beam.PCollection) beam.PCollection {
 	files := beam.ParDo(s, expandFn, col)
-	sized := beam.ParDo(s, sizeFn, files)
-	return beam.ParDo(s, &readFn{}, sized)
+	return beam.ParDo(s, &readFn{}, files)
 }
 
 // expandFn expands a glob pattern into all matching file names.
@@ -108,36 +106,29 @@ func expandFn(ctx context.Context, glob string, emit func(string)) error {
 	return nil
 }
 
-// sizeFn pairs a filename with the size of that file in bytes.
-// TODO(https://github.com/apache/beam/issues/20607): Once CreateInitialRestriction supports Context params and
-// error return values, this can be done in readSdfFn.CreateInitialRestriction.
-func sizeFn(ctx context.Context, filename string) (string, int64, error) {
+// readFn reads individual lines from a text file. Implemented as an SDF
+// to allow splitting within a file.
+type readFn struct {
+}
+
+// CreateInitialRestriction creates an offset range restriction representing
+// the file's size in bytes.
+func (fn *readFn) CreateInitialRestriction(ctx context.Context, filename string) (offsetrange.Restriction, error) {
 	fs, err := filesystem.New(ctx, filename)
 	if err != nil {
-		return "", -1, err
+		return offsetrange.Restriction{}, err
 	}
 	defer fs.Close()
 
 	size, err := fs.Size(ctx, filename)
 	if err != nil {
-		return "", -1, err
+		return offsetrange.Restriction{}, err
 	}
-	return filename, size, nil
-}
 
-// readFn reads individual lines from a text file, given a filename and a
-// size in bytes for that file. Implemented as an SDF to allow splitting
-// within a file.
-type readFn struct {
-}
-
-// CreateInitialRestriction creates an offset range restriction representing
-// the file, using the paired size rather than fetching the file's size.
-func (fn *readFn) CreateInitialRestriction(_ string, size int64) offsetrange.Restriction {
 	return offsetrange.Restriction{
 		Start: 0,
 		End:   size,
-	}
+	}, nil
 }
 
 const (
@@ -150,7 +141,7 @@ const (
 
 // SplitRestriction splits each file restriction into blocks of a predeterined
 // size, with some checks to avoid having small remainders.
-func (fn *readFn) SplitRestriction(_ string, _ int64, rest offsetrange.Restriction) []offsetrange.Restriction {
+func (fn *readFn) SplitRestriction(_ string, rest offsetrange.Restriction) []offsetrange.Restriction {
 	splits := rest.SizedSplits(blockSize)
 	numSplits := len(splits)
 	if numSplits > 1 {
@@ -165,7 +156,7 @@ func (fn *readFn) SplitRestriction(_ string, _ int64, rest offsetrange.Restricti
 }
 
 // Size returns the size of each restriction as its range.
-func (fn *readFn) RestrictionSize(_ string, _ int64, rest offsetrange.Restriction) float64 {
+func (fn *readFn) RestrictionSize(_ string, rest offsetrange.Restriction) float64 {
 	return rest.Size()
 }
 
@@ -183,7 +174,7 @@ func (fn *readFn) CreateTracker(rest offsetrange.Restriction) *sdf.LockRTracker
 // begin within the restriction and past the restriction (those are entirely
 // output, including the portion outside the restriction). In some cases a
 // valid restriction might not output any lines.
-func (fn *readFn) ProcessElement(ctx context.Context, rt *sdf.LockRTracker, filename string, _ int64, emit func(string)) error {
+func (fn *readFn) ProcessElement(ctx context.Context, rt *sdf.LockRTracker, filename string, emit func(string)) error {
 	log.Infof(ctx, "Reading from %v", filename)
 
 	fs, err := filesystem.New(ctx, filename)


[beam] 09/13: Fix Tensorflow intergration test model path (#25553)

Posted by lo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch prism-jobservices
in repository https://gitbox.apache.org/repos/asf/beam.git

commit d7e879496495d3d74058eba6196e6d6bcccbfd60
Author: Anand Inguva <34...@users.noreply.github.com>
AuthorDate: Sun Feb 19 11:51:51 2023 -0500

    Fix Tensorflow intergration test model path (#25553)
    
    * Fix model path
    
    * Fix tox
---
 .../python/apache_beam/ml/inference/tensorflow_inference_it_test.py | 6 +++---
 sdks/python/tox.ini                                                 | 1 -
 2 files changed, 3 insertions(+), 4 deletions(-)

diff --git a/sdks/python/apache_beam/ml/inference/tensorflow_inference_it_test.py b/sdks/python/apache_beam/ml/inference/tensorflow_inference_it_test.py
index 7b4b13ce2e1..3c92461c15a 100644
--- a/sdks/python/apache_beam/ml/inference/tensorflow_inference_it_test.py
+++ b/sdks/python/apache_beam/ml/inference/tensorflow_inference_it_test.py
@@ -51,9 +51,9 @@ class TensorflowInference(unittest.TestCase):
   def test_tf_mnist_classification(self):
     test_pipeline = TestPipeline(is_integration_test=True)
     input_file = 'gs://apache-beam-ml/testing/inputs/it_mnist_data.csv'
-    output_file_dir = 'apache-beam-ml/testing/outputs'
+    output_file_dir = 'gs://apache-beam-ml/testing/outputs'
     output_file = '/'.join([output_file_dir, str(uuid.uuid4()), 'result.txt'])
-    model_path = 'apache-beam-ml/models/tensorflow/mnist/'
+    model_path = 'gs://apache-beam-ml/models/tensorflow/mnist/'
     extra_opts = {
         'input': input_file,
         'output': output_file,
@@ -85,7 +85,7 @@ class TensorflowInference(unittest.TestCase):
     image_dir = (
         'https://storage.googleapis.com/download.tensorflow.org/example_images/'
     )
-    output_file_dir = 'apache-beam-ml/testing/outputs'
+    output_file_dir = 'gs://apache-beam-ml/testing/outputs'
     output_file = '/'.join([output_file_dir, str(uuid.uuid4()), 'result.txt'])
     model_path = (
         'https://tfhub.dev/google/tf2-preview/mobilenet_v2/classification/4')
diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini
index 5b7e10bf12a..c21e384ca86 100644
--- a/sdks/python/tox.ini
+++ b/sdks/python/tox.ini
@@ -344,7 +344,6 @@ commands =
   # Run all ONNX unit tests
   pytest -o junit_suite_name={envname} --junitxml=pytest_{envname}.xml -n 6 -m uses_onnx {posargs}
   
-[testenv:py{37,38,39,310}-tf-{211}]
 [testenv:py{37,38,39,310}-tensorflow-{29,210,211}]
 deps =
   -r build-requirements.txt


[beam] 02/13: Run prbot updates on hosted runners (#25544)

Posted by lo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch prism-jobservices
in repository https://gitbox.apache.org/repos/asf/beam.git

commit b24e3850100f3c08d06d72412fe3f5b33f49d191
Author: Danny McCormick <da...@google.com>
AuthorDate: Fri Feb 17 15:34:04 2023 -0500

    Run prbot updates on hosted runners (#25544)
---
 .github/workflows/pr-bot-pr-updates.yml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/.github/workflows/pr-bot-pr-updates.yml b/.github/workflows/pr-bot-pr-updates.yml
index 01e6a31673c..d96a11368cb 100644
--- a/.github/workflows/pr-bot-pr-updates.yml
+++ b/.github/workflows/pr-bot-pr-updates.yml
@@ -31,7 +31,7 @@ jobs:
 
     # Don't run on forks
     if: github.repository == 'apache/beam'
-    runs-on: [self-hosted, ubuntu-20.04]
+    runs-on: ubuntu-latest
 
     steps:
       # Pin to master so users can't do anything malicious on their own branch and run it here.


[beam] 13/13: [prism] minimum required job services

Posted by lo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch prism-jobservices
in repository https://gitbox.apache.org/repos/asf/beam.git

commit da30883507668e4cfb1f378186a47aa5dce201be
Author: Robert Burke <ro...@frantil.com>
AuthorDate: Sun Feb 19 06:25:05 2023 -0800

    [prism] minimum required job services
---
 .../runners/prism/internal/jobservices/artifact.go |  81 ++++++++++++
 .../beam/runners/prism/internal/jobservices/job.go | 120 +++++++++++++++++
 .../prism/internal/jobservices/management.go       | 142 +++++++++++++++++++++
 .../runners/prism/internal/jobservices/metrics.go  |   2 -
 .../runners/prism/internal/jobservices/server.go   |  83 ++++++++++++
 .../prism/internal/jobservices/server_test.go      |  79 ++++++++++++
 6 files changed, 505 insertions(+), 2 deletions(-)

diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/artifact.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/artifact.go
new file mode 100644
index 00000000000..7ed88e5475e
--- /dev/null
+++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/artifact.go
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package jobservices
+
+import (
+	"fmt"
+	"io"
+
+	jobpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/jobmanagement_v1"
+	"golang.org/x/exp/slog"
+)
+
+func (s *Server) ReverseArtifactRetrievalService(stream jobpb.ArtifactStagingService_ReverseArtifactRetrievalServiceServer) error {
+	in, err := stream.Recv()
+	if err == io.EOF {
+		return nil
+	}
+	if err != nil {
+		return err
+	}
+	job := s.jobs[in.GetStagingToken()]
+
+	envs := job.Pipeline.GetComponents().GetEnvironments()
+	for _, env := range envs {
+		for _, dep := range env.GetDependencies() {
+			slog.Debug("GetArtifact start",
+				slog.Group("dep",
+					slog.String("urn", dep.GetTypeUrn()),
+					slog.String("payload", string(dep.GetTypePayload()))))
+			stream.Send(&jobpb.ArtifactRequestWrapper{
+				Request: &jobpb.ArtifactRequestWrapper_GetArtifact{
+					GetArtifact: &jobpb.GetArtifactRequest{
+						Artifact: dep,
+					},
+				},
+			})
+			var count int
+			for {
+				in, err := stream.Recv()
+				if err == io.EOF {
+					return nil
+				}
+				if err != nil {
+					return err
+				}
+				if in.IsLast {
+					slog.Debug("GetArtifact finish",
+						slog.Group("dep",
+							slog.String("urn", dep.GetTypeUrn()),
+							slog.String("payload", string(dep.GetTypePayload()))),
+						slog.Int("bytesReceived", count))
+					break
+				}
+				// Here's where we go through each environment's artifacts.
+				// We do nothing with them.
+				switch req := in.GetResponse().(type) {
+				case *jobpb.ArtifactResponseWrapper_GetArtifactResponse:
+					count += len(req.GetArtifactResponse.GetData())
+				case *jobpb.ArtifactResponseWrapper_ResolveArtifactResponse:
+					err := fmt.Errorf("Unexpected ResolveArtifactResponse to GetArtifact: %v", in.GetResponse())
+					slog.Error("GetArtifact failure", err)
+					return err
+				}
+			}
+		}
+	}
+	return nil
+}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go
new file mode 100644
index 00000000000..95b1ce12af9
--- /dev/null
+++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go
@@ -0,0 +1,120 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package jobservices handles services necessary WRT handling jobs from
+// SDKs. Nominally this is the entry point for most users, and a job's
+// external interactions outside of pipeline execution.
+//
+// This includes handling receiving, staging, and provisioning artifacts,
+// and orchestrating external workers, such as for loopback mode.
+//
+// Execution of jobs is abstracted away to an execute function specified
+// at server construction time.
+package jobservices
+
+import (
+	"context"
+	"fmt"
+	"sort"
+	"strings"
+	"sync/atomic"
+
+	fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+	jobpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/jobmanagement_v1"
+	pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/urns"
+	"golang.org/x/exp/slog"
+	"google.golang.org/protobuf/types/known/structpb"
+)
+
+var capabilities = map[string]struct{}{
+	urns.RequirementSplittableDoFn: {},
+}
+
+// TODO, move back to main package, and key off of executor handlers?
+// Accept whole pipeline instead, and look at every PTransform too.
+func isSupported(requirements []string) error {
+	var unsupported []string
+	for _, req := range requirements {
+		if _, ok := capabilities[req]; !ok {
+			unsupported = append(unsupported, req)
+		}
+	}
+	if len(unsupported) > 0 {
+		sort.Strings(unsupported)
+		return fmt.Errorf("local runner doesn't support the following required features: %v", strings.Join(unsupported, ","))
+	}
+	return nil
+}
+
+// Job is an interface to the job services for executing pipelines.
+// It allows the executor to communicate status, messages, and metrics
+// back to callers of the Job Management API.
+type Job struct {
+	key     string
+	jobName string
+
+	Pipeline *pipepb.Pipeline
+	options  *structpb.Struct
+
+	// Management side concerns.
+	msgChan   chan string
+	state     atomic.Value // jobpb.JobState_Enum
+	stateChan chan jobpb.JobState_Enum
+
+	// Context used to terminate this job.
+	RootCtx  context.Context
+	CancelFn context.CancelFunc
+
+	metrics metricsStore
+}
+
+func (j *Job) ContributeMetrics(payloads *fnpb.ProcessBundleResponse) {
+	j.metrics.ContributeMetrics(payloads)
+}
+
+func (j *Job) String() string {
+	return fmt.Sprintf("%v[%v]", j.key, j.jobName)
+}
+
+func (j *Job) LogValue() slog.Value {
+	return slog.GroupValue(
+		slog.String("key", j.key),
+		slog.String("name", j.jobName))
+}
+
+func (j *Job) SendMsg(msg string) {
+	j.msgChan <- msg
+}
+
+// Start indicates that the job is preparing to execute.
+func (j *Job) Start() {
+	j.stateChan <- jobpb.JobState_STARTING
+}
+
+// Running indicates that the job is executing.
+func (j *Job) Running() {
+	j.stateChan <- jobpb.JobState_RUNNING
+}
+
+// Done indicates that the job completed successfully.
+func (j *Job) Done() {
+	j.stateChan <- jobpb.JobState_DONE
+}
+
+// Failed indicates that the job completed unsuccessfully.
+func (j *Job) Failed() {
+	j.stateChan <- jobpb.JobState_FAILED
+}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go
new file mode 100644
index 00000000000..23150d36a9b
--- /dev/null
+++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go
@@ -0,0 +1,142 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package jobservices
+
+import (
+	"context"
+	"fmt"
+	"sync/atomic"
+
+	jobpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/jobmanagement_v1"
+	pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
+	"golang.org/x/exp/slog"
+)
+
+func (s *Server) nextId() string {
+	v := atomic.AddUint32(&s.index, 1)
+	return fmt.Sprintf("job-%03d", v)
+}
+
+func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (*jobpb.PrepareJobResponse, error) {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+
+	// Since jobs execute in the background, they should not be tied to a request's context.
+	rootCtx, cancelFn := context.WithCancel(context.Background())
+	job := &Job{
+		key:      s.nextId(),
+		Pipeline: req.GetPipeline(),
+		jobName:  req.GetJobName(),
+		options:  req.GetPipelineOptions(),
+
+		msgChan:   make(chan string, 100),
+		stateChan: make(chan jobpb.JobState_Enum, 1),
+		RootCtx:   rootCtx,
+		CancelFn:  cancelFn,
+	}
+
+	// Queue initial state of the job.
+	job.state.Store(jobpb.JobState_STOPPED)
+	job.stateChan <- job.state.Load().(jobpb.JobState_Enum)
+
+	if err := isSupported(job.Pipeline.GetRequirements()); err != nil {
+		slog.Error("unable to run job", err, slog.String("jobname", req.GetJobName()))
+		return nil, err
+	}
+	s.jobs[job.key] = job
+	return &jobpb.PrepareJobResponse{
+		PreparationId:       job.key,
+		StagingSessionToken: job.key,
+		ArtifactStagingEndpoint: &pipepb.ApiServiceDescriptor{
+			Url: s.Endpoint(),
+		},
+	}, nil
+}
+
+func (s *Server) Run(ctx context.Context, req *jobpb.RunJobRequest) (*jobpb.RunJobResponse, error) {
+	s.mu.Lock()
+	job := s.jobs[req.GetPreparationId()]
+	s.mu.Unlock()
+
+	// Bring up a background goroutine to allow the job to continue processing.
+	go s.execute(job)
+
+	return &jobpb.RunJobResponse{
+		JobId: job.key,
+	}, nil
+}
+
+// Subscribe to a stream of state changes and messages from the job
+func (s *Server) GetMessageStream(req *jobpb.JobMessagesRequest, stream jobpb.JobService_GetMessageStreamServer) error {
+	s.mu.Lock()
+	job := s.jobs[req.GetJobId()]
+	s.mu.Unlock()
+
+	for {
+		select {
+		case msg := <-job.msgChan:
+			stream.Send(&jobpb.JobMessagesResponse{
+				Response: &jobpb.JobMessagesResponse_MessageResponse{
+					MessageResponse: &jobpb.JobMessage{
+						MessageText: msg,
+						Importance:  jobpb.JobMessage_JOB_MESSAGE_BASIC,
+					},
+				},
+			})
+
+		case state, ok := <-job.stateChan:
+			// TODO: Don't block job execution if WaitForCompletion isn't being run.
+			// The state channel means the job may only execute if something is observing
+			// the message stream, as the send on the state or message channel may block
+			// once full.
+			// Not a problem for tests or short lived batch, but would be hazardous for
+			// asynchronous jobs.
+
+			// Channel is closed, so the job must be done.
+			if !ok {
+				state = jobpb.JobState_DONE
+			}
+			job.state.Store(state)
+			stream.Send(&jobpb.JobMessagesResponse{
+				Response: &jobpb.JobMessagesResponse_StateResponse{
+					StateResponse: &jobpb.JobStateEvent{
+						State: state,
+					},
+				},
+			})
+			switch state {
+			case jobpb.JobState_CANCELLED, jobpb.JobState_DONE, jobpb.JobState_DRAINED, jobpb.JobState_FAILED, jobpb.JobState_UPDATED:
+				// Reached terminal state.
+				return nil
+			}
+		}
+	}
+
+}
+
+// GetJobMetrics Fetch metrics for a given job.
+func (s *Server) GetJobMetrics(ctx context.Context, req *jobpb.GetJobMetricsRequest) (*jobpb.GetJobMetricsResponse, error) {
+	j := s.getJob(req.GetJobId())
+	if j == nil {
+		return nil, fmt.Errorf("GetJobMetrics: unknown jobID: %v", req.GetJobId())
+	}
+	return &jobpb.GetJobMetricsResponse{
+		Metrics: &jobpb.MetricResults{
+			Attempted: j.metrics.Results(tentative),
+			Committed: j.metrics.Results(committed),
+		},
+	}, nil
+}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics.go
index 1dc0723e3af..39936bae72f 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics.go
@@ -330,8 +330,6 @@ func (m *distributionInt64) accumulate(pyld []byte) error {
 		Min:   ordMin(m.dist.Min, dist.Min),
 		Max:   ordMax(m.dist.Max, dist.Max),
 	}
-	fmt.Println("dist", dist)
-	fmt.Println("m.dist", dist)
 	return nil
 }
 
diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/server.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/server.go
new file mode 100644
index 00000000000..41df57d6eb8
--- /dev/null
+++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/server.go
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package jobservices
+
+import (
+	"fmt"
+	"net"
+	"sync"
+
+	jobpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/jobmanagement_v1"
+	"golang.org/x/exp/slog"
+
+	"google.golang.org/grpc"
+)
+
+type Server struct {
+	jobpb.UnimplementedJobServiceServer
+	jobpb.UnimplementedArtifactStagingServiceServer
+
+	// Server management
+	lis    net.Listener
+	server *grpc.Server
+
+	// Job Management
+	mu    sync.Mutex
+	index uint32
+	jobs  map[string]*Job
+
+	// execute defines how a job is executed.
+	execute func(*Job)
+}
+
+// NewServer acquires the indicated port.
+func NewServer(port int, execute func(*Job)) *Server {
+	lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
+	if err != nil {
+		panic(fmt.Sprintf("failed to listen: %v", err))
+	}
+	s := &Server{
+		lis:     lis,
+		jobs:    make(map[string]*Job),
+		execute: execute,
+	}
+	slog.Info("Serving JobManagement", slog.String("endpoint", s.Endpoint()))
+	var opts []grpc.ServerOption
+	s.server = grpc.NewServer(opts...)
+	jobpb.RegisterJobServiceServer(s.server, s)
+	jobpb.RegisterArtifactStagingServiceServer(s.server, s)
+	return s
+}
+
+func (s *Server) getJob(id string) *Job {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	return s.jobs[id]
+}
+
+func (s *Server) Endpoint() string {
+	return s.lis.Addr().String()
+}
+
+// Serve serves on the started listener. Blocks.
+func (s *Server) Serve() {
+	s.server.Serve(s.lis)
+}
+
+// Stop the GRPC server.
+func (s *Server) Stop() {
+	s.server.GracefulStop()
+}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/server_test.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/server_test.go
new file mode 100644
index 00000000000..2223f030ce1
--- /dev/null
+++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/server_test.go
@@ -0,0 +1,79 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package jobservices
+
+import (
+	"context"
+	"sync"
+	"testing"
+
+	jobpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/jobmanagement_v1"
+	pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/urns"
+	"google.golang.org/protobuf/encoding/prototext"
+)
+
+// TestServer_Lifecycle validates that a server can start and stop.
+func TestServer_Lifecycle(t *testing.T) {
+	undertest := NewServer(0, func(j *Job) {
+		t.Fatalf("unexpected call to execute: %v", j)
+	})
+
+	go undertest.Serve()
+
+	undertest.Stop()
+}
+
+// Validates that a job can start and stop.
+func TestServer_JobLifecycle(t *testing.T) {
+	var called sync.WaitGroup
+	called.Add(1)
+	undertest := NewServer(0, func(j *Job) {
+		called.Done()
+	})
+	ctx := context.Background()
+
+	wantPipeline := &pipepb.Pipeline{
+		Requirements: []string{urns.RequirementSplittableDoFn},
+	}
+	wantName := "testJob"
+
+	resp, err := undertest.Prepare(ctx, &jobpb.PrepareJobRequest{
+		Pipeline: wantPipeline,
+		JobName:  wantName,
+	})
+	if err != nil {
+		t.Fatalf("server.Prepare() = %v, want nil", err)
+	}
+
+	if got := resp.GetPreparationId(); got == "" {
+		t.Fatalf("server.Prepare() = returned empty preparation ID, want non-empty: %v", prototext.Format(resp))
+	}
+
+	runResp, err := undertest.Run(ctx, &jobpb.RunJobRequest{
+		PreparationId: resp.GetPreparationId(),
+	})
+	if err != nil {
+		t.Fatalf("server.Run() = %v, want nil", err)
+	}
+	if got := runResp.GetJobId(); got == "" {
+		t.Fatalf("server.Run() = returned empty preparation ID, want non-empty")
+	}
+	// If execute is never called, this doesn't unblock and timesout.
+	called.Wait()
+	t.Log("success!")
+	// Nothing to cleanup because we didn't start the server.
+}


[beam] 12/13: [#24789][prism] add preprocessor and test (#25520)

Posted by lo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch prism-jobservices
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 2cb402ad92a17ef319a3823a6039978196a99092
Author: Robert Burke <lo...@users.noreply.github.com>
AuthorDate: Sun Feb 19 16:43:48 2023 -0800

    [#24789][prism] add preprocessor and test (#25520)
    
    * [prism] add preprocessor and test
    
    * [prism] preparer comment
    
    * [prism] move preparer
    
    ---------
    
    Co-authored-by: lostluck <13...@users.noreply.github.com>
---
 sdks/go/pkg/beam/runners/prism/internal/execute.go |  22 +++
 .../pkg/beam/runners/prism/internal/preprocess.go  | 148 +++++++++++++++++
 .../beam/runners/prism/internal/preprocess_test.go | 181 +++++++++++++++++++++
 3 files changed, 351 insertions(+)

diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute.go b/sdks/go/pkg/beam/runners/prism/internal/execute.go
new file mode 100644
index 00000000000..b685df63cf6
--- /dev/null
+++ b/sdks/go/pkg/beam/runners/prism/internal/execute.go
@@ -0,0 +1,22 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package internal
+
+// stage represents a fused subgraph.
+// temporary implementation to break up PRs.
+type stage struct {
+	transforms []string
+}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/preprocess.go b/sdks/go/pkg/beam/runners/prism/internal/preprocess.go
new file mode 100644
index 00000000000..8769a05d38f
--- /dev/null
+++ b/sdks/go/pkg/beam/runners/prism/internal/preprocess.go
@@ -0,0 +1,148 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package internal
+
+import (
+	"sort"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/pipelinex"
+	pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
+	"golang.org/x/exp/maps"
+	"golang.org/x/exp/slog"
+)
+
+// transformPreparer is an interface for handling different urns in the preprocessor
+// largely for exchanging transforms for others, to be added to the complete set of
+// components in the pipeline.
+type transformPreparer interface {
+	// PrepareUrns returns the Beam URNs that this handler deals with for preprocessing.
+	PrepareUrns() []string
+	// PrepareTransform takes a PTransform proto and returns a set of new Components, and a list of
+	// transformIDs leaves to remove and ignore from graph processing.
+	PrepareTransform(tid string, t *pipepb.PTransform, comps *pipepb.Components) (*pipepb.Components, []string)
+}
+
+// preprocessor retains configuration for preprocessing the
+// graph, such as special handling for lifted combiners or
+// other configuration.
+type preprocessor struct {
+	transformPreparers map[string]transformPreparer
+}
+
+func newPreprocessor(preps []transformPreparer) *preprocessor {
+	preparers := map[string]transformPreparer{}
+	for _, prep := range preps {
+		for _, urn := range prep.PrepareUrns() {
+			preparers[urn] = prep
+		}
+	}
+	return &preprocessor{
+		transformPreparers: preparers,
+	}
+}
+
+// preProcessGraph takes the graph and preprocesses for consumption in bundles.
+// The output is the topological sort of the transform ids.
+//
+// These are how transforms are related in graph form, but not the specific bundles themselves, which will come later.
+//
+// Handles awareness of composite transforms and similar. Ultimately, after this point
+// the graph stops being a hypergraph, with composite transforms being treated as
+// "leaves" downstream as needed.
+//
+// This is where Combines become lifted (if it makes sense, or is configured), and similar behaviors.
+func (p *preprocessor) preProcessGraph(comps *pipepb.Components) []*stage {
+	ts := comps.GetTransforms()
+
+	// TODO move this out of this part of the pre-processor?
+	leaves := map[string]struct{}{}
+	ignore := map[string]struct{}{}
+	for tid, t := range ts {
+		if _, ok := ignore[tid]; ok {
+			continue
+		}
+
+		spec := t.GetSpec()
+		if spec == nil {
+			// Most composites don't have specs.
+			slog.Debug("transform is missing a spec",
+				slog.Group("transform", slog.String("ID", tid), slog.String("name", t.GetUniqueName())))
+			continue
+		}
+
+		// Composite Transforms basically means needing to remove the "leaves" from the
+		// handling set, and producing the new sub component transforms. The top level
+		// composite should have enough information to produce the new sub transforms.
+		// In particular, the inputs and outputs need to all be connected and matched up
+		// so the topological sort still works out.
+		h := p.transformPreparers[spec.GetUrn()]
+		if h == nil {
+
+			// If there's an unknown urn, and it's not composite, simply add it to the leaves.
+			if len(t.GetSubtransforms()) == 0 {
+				leaves[tid] = struct{}{}
+			} else {
+				slog.Info("composite transform has unknown urn",
+					slog.Group("transform", slog.String("ID", tid),
+						slog.String("name", t.GetUniqueName()),
+						slog.String("urn", spec.GetUrn())))
+			}
+			continue
+		}
+
+		subs, toRemove := h.PrepareTransform(tid, t, comps)
+
+		// Clear out unnecessary leaves from this composite for topological sort handling.
+		for _, key := range toRemove {
+			ignore[key] = struct{}{}
+			delete(leaves, key)
+		}
+
+		// ts should be a clone, so we should be able to add new transforms into the map.
+		for tid, t := range subs.GetTransforms() {
+			leaves[tid] = struct{}{}
+			ts[tid] = t
+		}
+		for cid, c := range subs.GetCoders() {
+			comps.GetCoders()[cid] = c
+		}
+		for nid, n := range subs.GetPcollections() {
+			comps.GetPcollections()[nid] = n
+		}
+		// It's unlikely for these to change, but better to handle them now, to save a headache later.
+		for wid, w := range subs.GetWindowingStrategies() {
+			comps.GetWindowingStrategies()[wid] = w
+		}
+		for envid, env := range subs.GetEnvironments() {
+			comps.GetEnvironments()[envid] = env
+		}
+	}
+
+	// Extract URNs for the given transform.
+
+	keptLeaves := maps.Keys(leaves)
+	sort.Strings(keptLeaves)
+	topological := pipelinex.TopologicalSort(ts, keptLeaves)
+	slog.Debug("topological transform ordering", topological)
+
+	var stages []*stage
+	for _, tid := range topological {
+		stages = append(stages, &stage{
+			transforms: []string{tid},
+		})
+	}
+	return stages
+}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/preprocess_test.go b/sdks/go/pkg/beam/runners/prism/internal/preprocess_test.go
new file mode 100644
index 00000000000..add69a7c767
--- /dev/null
+++ b/sdks/go/pkg/beam/runners/prism/internal/preprocess_test.go
@@ -0,0 +1,181 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package internal
+
+import (
+	"testing"
+
+	pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
+	"github.com/google/go-cmp/cmp"
+	"google.golang.org/protobuf/testing/protocmp"
+)
+
+func Test_preprocessor_preProcessGraph(t *testing.T) {
+	tests := []struct {
+		name  string
+		input *pipepb.Components
+
+		wantComponents *pipepb.Components
+		wantStages     []*stage
+	}{
+		{
+			name: "noPreparer",
+			input: &pipepb.Components{
+				Transforms: map[string]*pipepb.PTransform{
+					"e1": {
+						UniqueName: "e1",
+						Spec: &pipepb.FunctionSpec{
+							Urn: "defaultUrn",
+						},
+					},
+				},
+			},
+
+			wantStages: []*stage{{transforms: []string{"e1"}}},
+			wantComponents: &pipepb.Components{
+				Transforms: map[string]*pipepb.PTransform{
+					"e1": {
+						UniqueName: "e1",
+						Spec: &pipepb.FunctionSpec{
+							Urn: "defaultUrn",
+						},
+					},
+				},
+			},
+		}, {
+			name: "preparer",
+			input: &pipepb.Components{
+				Transforms: map[string]*pipepb.PTransform{
+					"e1": {
+						UniqueName: "e1",
+						Spec: &pipepb.FunctionSpec{
+							Urn: "test_urn",
+						},
+					},
+				},
+				// Initialize maps because they always are by proto unmarshallers.
+				Pcollections:        map[string]*pipepb.PCollection{},
+				WindowingStrategies: map[string]*pipepb.WindowingStrategy{},
+				Coders:              map[string]*pipepb.Coder{},
+				Environments:        map[string]*pipepb.Environment{},
+			},
+
+			wantStages: []*stage{{transforms: []string{"e1_early"}}, {transforms: []string{"e1_late"}}},
+			wantComponents: &pipepb.Components{
+				Transforms: map[string]*pipepb.PTransform{
+					// Original is always kept
+					"e1": {
+						UniqueName: "e1",
+						Spec: &pipepb.FunctionSpec{
+							Urn: "test_urn",
+						},
+					},
+					"e1_early": {
+						UniqueName: "e1_early",
+						Spec: &pipepb.FunctionSpec{
+							Urn: "defaultUrn",
+						},
+						Outputs:       map[string]string{"i0": "pcol1"},
+						EnvironmentId: "env1",
+					},
+					"e1_late": {
+						UniqueName: "e1_late",
+						Spec: &pipepb.FunctionSpec{
+							Urn: "defaultUrn",
+						},
+						Inputs:        map[string]string{"i0": "pcol1"},
+						EnvironmentId: "env1",
+					},
+				},
+				Pcollections: map[string]*pipepb.PCollection{
+					"pcol1": {
+						UniqueName:          "pcol1",
+						CoderId:             "coder1",
+						WindowingStrategyId: "ws1",
+					},
+				},
+				Coders: map[string]*pipepb.Coder{
+					"coder1": {Spec: &pipepb.FunctionSpec{Urn: "coder1"}},
+				},
+				WindowingStrategies: map[string]*pipepb.WindowingStrategy{
+					"ws1": {WindowCoderId: "global"},
+				},
+				Environments: map[string]*pipepb.Environment{
+					"env1": {Urn: "env1"},
+				},
+			},
+		},
+	}
+	for _, test := range tests {
+		t.Run(test.name, func(t *testing.T) {
+			pre := newPreprocessor([]transformPreparer{&testPreparer{}})
+
+			gotStages := pre.preProcessGraph(test.input)
+			if diff := cmp.Diff(test.wantStages, gotStages, cmp.AllowUnexported(stage{})); diff != "" {
+				t.Errorf("preProcessGraph(%q) stages diff (-want,+got)\n%v", test.name, diff)
+			}
+
+			if diff := cmp.Diff(test.input, test.wantComponents, protocmp.Transform()); diff != "" {
+				t.Errorf("preProcessGraph(%q) components diff (-want,+got)\n%v", test.name, diff)
+			}
+		})
+	}
+}
+
+type testPreparer struct{}
+
+func (p *testPreparer) PrepareUrns() []string {
+	return []string{"test_urn"}
+}
+
+func (p *testPreparer) PrepareTransform(tid string, t *pipepb.PTransform, comps *pipepb.Components) (*pipepb.Components, []string) {
+	return &pipepb.Components{
+		Transforms: map[string]*pipepb.PTransform{
+			"e1_early": {
+				UniqueName: "e1_early",
+				Spec: &pipepb.FunctionSpec{
+					Urn: "defaultUrn",
+				},
+				Outputs:       map[string]string{"i0": "pcol1"},
+				EnvironmentId: "env1",
+			},
+			"e1_late": {
+				UniqueName: "e1_late",
+				Spec: &pipepb.FunctionSpec{
+					Urn: "defaultUrn",
+				},
+				Inputs:        map[string]string{"i0": "pcol1"},
+				EnvironmentId: "env1",
+			},
+		},
+		Pcollections: map[string]*pipepb.PCollection{
+			"pcol1": {
+				UniqueName:          "pcol1",
+				CoderId:             "coder1",
+				WindowingStrategyId: "ws1",
+			},
+		},
+		Coders: map[string]*pipepb.Coder{
+			"coder1": {Spec: &pipepb.FunctionSpec{Urn: "coder1"}},
+		},
+		WindowingStrategies: map[string]*pipepb.WindowingStrategy{
+			"ws1": {WindowCoderId: "global"},
+		},
+		Environments: map[string]*pipepb.Environment{
+			"env1": {Urn: "env1"},
+		},
+	}, []string{"e1"}
+}


[beam] 01/13: Stop use of self hosted runners for some workflows. (#25542)

Posted by lo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch prism-jobservices
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 0dc99edc01c9e1f10e4d7eb8072edb62fc651b39
Author: Robert Burke <lo...@users.noreply.github.com>
AuthorDate: Fri Feb 17 12:08:16 2023 -0800

    Stop use of self hosted runners for some workflows. (#25542)
    
    Co-authored-by: lostluck <13...@users.noreply.github.com>
---
 .github/workflows/git_tag_released_version.yml     | 2 +-
 .github/workflows/go_tests.yml                     | 2 +-
 .github/workflows/issue-tagger.yml                 | 2 +-
 .github/workflows/label_prs.yml                    | 2 +-
 .github/workflows/pr-bot-new-prs.yml               | 2 +-
 .github/workflows/pr-bot-prs-needing-attention.yml | 2 +-
 .github/workflows/publish_github_release_notes.yml | 4 ++--
 .github/workflows/reportGenerator.yml              | 2 +-
 .github/workflows/self-assign.yml                  | 2 +-
 .github/workflows/stale.yml                        | 2 +-
 .github/workflows/triaged-on-assign.yml            | 2 +-
 11 files changed, 12 insertions(+), 12 deletions(-)

diff --git a/.github/workflows/git_tag_released_version.yml b/.github/workflows/git_tag_released_version.yml
index 37f2c390b7f..871149bd26a 100644
--- a/.github/workflows/git_tag_released_version.yml
+++ b/.github/workflows/git_tag_released_version.yml
@@ -32,7 +32,7 @@ on:
 
 jobs:
   generate_tags:
-    runs-on: [self-hosted, ubuntu-20.04]
+    runs-on: ubuntu-latest
     env:
       VERSION_PATH: ${{ github.event.inputs.VERSION_TAG }}
     steps:
diff --git a/.github/workflows/go_tests.yml b/.github/workflows/go_tests.yml
index 22d613ba99a..fc772eb1ab2 100644
--- a/.github/workflows/go_tests.yml
+++ b/.github/workflows/go_tests.yml
@@ -35,7 +35,7 @@ concurrency:
   cancel-in-progress: true
 jobs:
   build:
-    runs-on: [self-hosted, ubuntu-20.04]
+    runs-on: ubuntu-latest
     name: Go Build
     steps:
       - name: Check out code
diff --git a/.github/workflows/issue-tagger.yml b/.github/workflows/issue-tagger.yml
index 7dbb4bf2d5d..39f92d87f78 100644
--- a/.github/workflows/issue-tagger.yml
+++ b/.github/workflows/issue-tagger.yml
@@ -20,7 +20,7 @@ on:
 
 jobs:
   label:
-    runs-on: [self-hosted, ubuntu-20.04]
+    runs-on: ubuntu-latest
     permissions:
       issues: write
     steps:
diff --git a/.github/workflows/label_prs.yml b/.github/workflows/label_prs.yml
index 02e2207e39a..aa04506f2bf 100644
--- a/.github/workflows/label_prs.yml
+++ b/.github/workflows/label_prs.yml
@@ -21,7 +21,7 @@ on: [pull_request_target]
 permissions: read-all
 jobs:
   label:
-    runs-on: [self-hosted, ubuntu-20.04]
+    runs-on: ubuntu-latest
     permissions:
       contents: read
       pull-requests: write
diff --git a/.github/workflows/pr-bot-new-prs.yml b/.github/workflows/pr-bot-new-prs.yml
index b511367e672..8ba27fbec3d 100644
--- a/.github/workflows/pr-bot-new-prs.yml
+++ b/.github/workflows/pr-bot-new-prs.yml
@@ -31,7 +31,7 @@ jobs:
       statuses: read
     # Don't run on forks
     if: github.repository == 'apache/beam'
-    runs-on: [self-hosted, ubuntu-20.04]
+    runs-on: ubuntu-latest
     steps:
       - uses: actions/checkout@v3
       - name: Setup Node
diff --git a/.github/workflows/pr-bot-prs-needing-attention.yml b/.github/workflows/pr-bot-prs-needing-attention.yml
index dd7e47fd487..e96d3983746 100644
--- a/.github/workflows/pr-bot-prs-needing-attention.yml
+++ b/.github/workflows/pr-bot-prs-needing-attention.yml
@@ -31,7 +31,7 @@ jobs:
       statuses: read
     # Don't run on forks
     if: github.repository == 'apache/beam'
-    runs-on: [self-hosted, ubuntu-20.04]
+    runs-on: ubuntu-latest
     steps:
       - uses: actions/checkout@v3
       - name: Setup Node
diff --git a/.github/workflows/publish_github_release_notes.yml b/.github/workflows/publish_github_release_notes.yml
index c8569ab4fc5..246ce690f8b 100644
--- a/.github/workflows/publish_github_release_notes.yml
+++ b/.github/workflows/publish_github_release_notes.yml
@@ -31,7 +31,7 @@ permissions: read-all
 
 jobs:
   set-properties:
-    runs-on: [self-hosted, ubuntu-20.04]
+    runs-on: ubuntu-latest
     outputs:
       properties: ${{ steps.test-properties.outputs.properties }}
     steps:
@@ -41,7 +41,7 @@ jobs:
         uses: ./.github/actions/setup-default-test-properties
 
   publish_github_release_notes:
-    runs-on: [self-hosted, ubuntu-20.04]
+    runs-on: ubuntu-latest
     needs: set-properties
     env:
       GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
diff --git a/.github/workflows/reportGenerator.yml b/.github/workflows/reportGenerator.yml
index 97a6056bc4c..44055cd5631 100644
--- a/.github/workflows/reportGenerator.yml
+++ b/.github/workflows/reportGenerator.yml
@@ -24,7 +24,7 @@ on:
 jobs:
   assign:
     name: Generate issue report
-    runs-on: [self-hosted, ubuntu-20.04]
+    runs-on: ubuntu-latest
     steps:
     - uses: actions/checkout@v3
     - name: Setup Node
diff --git a/.github/workflows/self-assign.yml b/.github/workflows/self-assign.yml
index 1afe7fbe430..c6b7cc69ce9 100644
--- a/.github/workflows/self-assign.yml
+++ b/.github/workflows/self-assign.yml
@@ -23,7 +23,7 @@ jobs:
       issues: write
     name: Take or close an issue
     if: ${{ !github.event.issue.pull_request }}
-    runs-on: [self-hosted, ubuntu-20.04]
+    runs-on: ubuntu-latest
     steps:
     - uses: actions/github-script@v6
       with:
diff --git a/.github/workflows/stale.yml b/.github/workflows/stale.yml
index 709c5dddfd2..fa2303a931e 100644
--- a/.github/workflows/stale.yml
+++ b/.github/workflows/stale.yml
@@ -21,7 +21,7 @@ on:
 permissions: read-all
 jobs:
   stale:
-    runs-on: [self-hosted, ubuntu-20.04]
+    runs-on: ubuntu-latest
     permissions:
       issues: write
       pull-requests: write
diff --git a/.github/workflows/triaged-on-assign.yml b/.github/workflows/triaged-on-assign.yml
index bebfb9cc5eb..188b53a8696 100644
--- a/.github/workflows/triaged-on-assign.yml
+++ b/.github/workflows/triaged-on-assign.yml
@@ -22,7 +22,7 @@ jobs:
     permissions:
       issues: write
     name: Mark issue as triaged when assigned
-    runs-on: [self-hosted, ubuntu-20.04]
+    runs-on: ubuntu-latest
     steps:
     - run: |
         ISSUE_NUMBER="$(jq '.issue.number' $GITHUB_EVENT_PATH)"