You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "lostluck (via GitHub)" <gi...@apache.org> on 2023/02/20 01:52:03 UTC

[GitHub] [beam] lostluck opened a new pull request, #25558: [prism] Handlers for combine, ParDo, GBK, Flatten

lostluck opened a new pull request, #25558:
URL: https://github.com/apache/beam/pull/25558

   Add in the handlers for Combine, ParDo, GBK, Flatten
   Adds in an executor interface. 
   
   No unit tests largely because these are largely rote Proto transformations, and are best tested in the pipeline context. They'll fully covered in the completed internal package.
   
   See https://github.com/apache/beam/issues/24789 for more information.
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/get-started-contributing/#make-the-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] brucearctor commented on pull request #25558: [#24789][prism] Handlers for combine, ParDo, GBK, Flatten

Posted by "brucearctor (via GitHub)" <gi...@apache.org>.
brucearctor commented on PR #25558:
URL: https://github.com/apache/beam/pull/25558#issuecomment-1436391698

   LGTM - but I've only been skimming the code here [ and https://github.com/apache/beam/pull/25556 and https://github.com/apache/beam/pull/25557 ... and your other related/previous PRs ].  Fantastic contribution, so glad you've gotten it this far already!  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] codecov[bot] commented on pull request #25558: [prism] Handlers for combine, ParDo, GBK, Flatten

Posted by "codecov[bot] (via GitHub)" <gi...@apache.org>.
codecov[bot] commented on PR #25558:
URL: https://github.com/apache/beam/pull/25558#issuecomment-1436189714

   # [Codecov](https://codecov.io/gh/apache/beam/pull/25558?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#25558](https://codecov.io/gh/apache/beam/pull/25558?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (c2ca697) into [master](https://codecov.io/gh/apache/beam/commit/6667eb4741bad1bad199b61012476d62ea3d7e27?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (6667eb4) will **decrease** coverage by `0.32%`.
   > The diff coverage is `0.00%`.
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #25558      +/-   ##
   ==========================================
   - Coverage   72.64%   72.32%   -0.32%     
   ==========================================
     Files         763      766       +3     
     Lines      101060   101494     +434     
   ==========================================
     Hits        73410    73410              
   - Misses      26228    26662     +434     
     Partials     1422     1422              
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | go | `51.61% <0.00%> (-0.71%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/25558?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...o/pkg/beam/runners/prism/internal/handlecombine.go](https://codecov.io/gh/apache/beam/pull/25558?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9nby9wa2cvYmVhbS9ydW5uZXJzL3ByaXNtL2ludGVybmFsL2hhbmRsZWNvbWJpbmUuZ28=) | `0.00% <0.00%> (ø)` | |
   | [.../go/pkg/beam/runners/prism/internal/handlepardo.go](https://codecov.io/gh/apache/beam/pull/25558?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9nby9wa2cvYmVhbS9ydW5uZXJzL3ByaXNtL2ludGVybmFsL2hhbmRsZXBhcmRvLmdv) | `0.00% <0.00%> (ø)` | |
   | [...go/pkg/beam/runners/prism/internal/handlerunner.go](https://codecov.io/gh/apache/beam/pull/25558?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9nby9wa2cvYmVhbS9ydW5uZXJzL3ByaXNtL2ludGVybmFsL2hhbmRsZXJ1bm5lci5nbw==) | `0.00% <0.00%> (ø)` | |
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lostluck commented on pull request #25558: [prism] Handlers for combine, ParDo, GBK, Flatten

Posted by "lostluck (via GitHub)" <gi...@apache.org>.
lostluck commented on PR #25558:
URL: https://github.com/apache/beam/pull/25558#issuecomment-1436198176

   R: @johannaojeling 
   
   This is where things get interesting! This converts/prepares the Pipeline Proto representations to fully expanded alternatives and similar.  You'll probably find the comments on how SDFs and Combine composites are expanded interesting.
   
   GroupByKeys and Flattens are pretty straight foward so far and do the appropriate in memory work. That would change depending on how data services evolve (eg. Stop being strictly in memory). But as it ism this is an OK abstraction to start.
   
   The code living in an internal package gives us the freedom to evolve the abstractions we're using as needed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lostluck commented on a diff in pull request #25558: [#24789][prism] Handlers for combine, ParDo, GBK, Flatten

Posted by "lostluck (via GitHub)" <gi...@apache.org>.
lostluck commented on code in PR #25558:
URL: https://github.com/apache/beam/pull/25558#discussion_r1112181097


##########
sdks/go/pkg/beam/runners/prism/internal/handlerunner.go:
##########
@@ -0,0 +1,298 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package internal
+
+import (
+	"bytes"
+	"fmt"
+	"io"
+	"reflect"
+	"sort"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+	pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/engine"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/urns"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/worker"
+	"golang.org/x/exp/slog"
+	"google.golang.org/protobuf/encoding/prototext"
+	"google.golang.org/protobuf/proto"
+)
+
+// This file retains the logic for the pardo handler
+
+// RunnerCharacteristic holds the configuration for Runner based transforms,
+// such as GBKs, Flattens.
+type RunnerCharacteristic struct {
+	SDKFlatten bool // Sets whether we should force an SDK side flatten.
+	SDKGBK     bool // Sets whether the GBK should be handled by the SDK, if possible by the SDK.
+}
+
+func Runner(config any) *runner {

Review Comment:
   In a public API, returning unexported types is annoying for users, but it's less so under the isolation of the internal folder. It's likely the handlers will end up in their own package, and get exported out that way, with the types changed as appropriate.
   
   While I've got the interfaces (preparer, executer) they're there to reduce the chance the abstraction will be broken. They'll be refined as other features are added to Prism, like Fusion, and handling the TestStream, and PubSubm URNs (and more).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lostluck merged pull request #25558: [#24789][prism] Handlers for combine, ParDo, GBK, Flatten

Posted by "lostluck (via GitHub)" <gi...@apache.org>.
lostluck merged PR #25558:
URL: https://github.com/apache/beam/pull/25558


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #25558: [prism] Handlers for combine, ParDo, GBK, Flatten

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #25558:
URL: https://github.com/apache/beam/pull/25558#issuecomment-1436199260

   Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johannaojeling commented on a diff in pull request #25558: [#24789][prism] Handlers for combine, ParDo, GBK, Flatten

Posted by "johannaojeling (via GitHub)" <gi...@apache.org>.
johannaojeling commented on code in PR #25558:
URL: https://github.com/apache/beam/pull/25558#discussion_r1112050138


##########
sdks/go/pkg/beam/runners/prism/internal/handlerunner.go:
##########
@@ -0,0 +1,298 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package internal
+
+import (
+	"bytes"
+	"fmt"
+	"io"
+	"reflect"
+	"sort"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+	pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/engine"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/urns"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/worker"
+	"golang.org/x/exp/slog"
+	"google.golang.org/protobuf/encoding/prototext"
+	"google.golang.org/protobuf/proto"
+)
+
+// This file retains the logic for the pardo handler
+
+// RunnerCharacteristic holds the configuration for Runner based transforms,
+// such as GBKs, Flattens.
+type RunnerCharacteristic struct {
+	SDKFlatten bool // Sets whether we should force an SDK side flatten.
+	SDKGBK     bool // Sets whether the GBK should be handled by the SDK, if possible by the SDK.
+}
+
+func Runner(config any) *runner {
+	return &runner{config: config.(RunnerCharacteristic)}
+}
+
+// runner represents an instance of the runner transform handler.
+type runner struct {
+	config RunnerCharacteristic
+}
+
+// ConfigURN returns the name for combine in the configuration file.
+func (*runner) ConfigURN() string {
+	return "runner"
+}
+
+func (*runner) ConfigCharacteristic() reflect.Type {
+	return reflect.TypeOf((*RunnerCharacteristic)(nil)).Elem()
+}
+
+var _ transformExecuter = (*runner)(nil)
+
+func (*runner) ExecuteUrns() []string {
+	return []string{urns.TransformFlatten, urns.TransformGBK}
+}
+
+// ExecuteWith returns what environment the
+func (h *runner) ExecuteWith(t *pipepb.PTransform) string {
+	urn := t.GetSpec().GetUrn()
+	if urn == urns.TransformFlatten && !h.config.SDKFlatten {
+		return ""
+	}
+	if urn == urns.TransformGBK && !h.config.SDKGBK {
+		return ""
+	}
+	return t.GetEnvironmentId()
+}
+
+// ExecTransform handles special processing with respect to runner specific transforms

Review Comment:
   ```suggestion
   // ExecuteTransform handles special processing with respect to runner specific transforms
   ```



##########
sdks/go/pkg/beam/runners/prism/internal/handlepardo.go:
##########
@@ -0,0 +1,242 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package internal
+
+import (
+	"fmt"
+	"reflect"
+
+	pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/urns"
+	"golang.org/x/exp/maps"
+	"google.golang.org/protobuf/proto"
+)
+
+// This file retains the logic for the pardo handler
+
+// ParDoCharacteristic holds the configuration for ParDos.
+type ParDoCharacteristic struct {
+	DisableSDF bool // Sets whether a pardo supports SDFs or not.
+}
+
+func ParDo(config any) *pardo {
+	return &pardo{config: config.(ParDoCharacteristic)}
+}
+
+// pardo represents an instance of the pardo handler.
+type pardo struct {
+	config ParDoCharacteristic
+}
+
+// ConfigURN returns the name for combine in the configuration file.
+func (*pardo) ConfigURN() string {
+	return "pardo"
+}
+
+func (*pardo) ConfigCharacteristic() reflect.Type {
+	return reflect.TypeOf((*ParDoCharacteristic)(nil)).Elem()
+}
+
+var _ transformPreparer = (*pardo)(nil)
+
+func (*pardo) PrepareUrns() []string {
+	return []string{urns.TransformParDo}
+}
+
+// PrepareTransform handles special processing with respect to ParDos, since their handling is dependant on supported features
+// and requirements.
+func (h *pardo) PrepareTransform(tid string, t *pipepb.PTransform, comps *pipepb.Components) (*pipepb.Components, []string) {
+
+	// ParDos are a pain in the butt.
+	// Combines, by comparison, are dramatically simpler.
+	// This is because for ParDos, how they are handled, and what kinds of transforms are in
+	// and around the ParDo, the actual shape of the graph will change.
+	// At their simplest, it's something a DoFn will handle on their own.
+	// At their most complex, they require intimate interaction with the subgraph
+	// bundling process, the data layer, state layers, and control layers.
+	// But unlike combines, which have a clear urn for composite + special payload,
+	// ParDos have the standard URN for composites with the standard payload.
+	// So always, we need to first unmarshal the payload.
+
+	pardoPayload := t.GetSpec().GetPayload()
+	pdo := &pipepb.ParDoPayload{}
+	if err := (proto.UnmarshalOptions{}).Unmarshal(pardoPayload, pdo); err != nil {
+		panic(fmt.Sprintf("unable to decode ParDoPayload for transform[%v]", t.GetUniqueName()))
+	}
+
+	// Lets check for and remove anything that makes things less simple.
+	if pdo.OnWindowExpirationTimerFamilySpec == "" &&
+		!pdo.RequestsFinalization &&
+		!pdo.RequiresStableInput &&
+		!pdo.RequiresTimeSortedInput &&
+		//	len(pdo.SideInputs) == 0 &&
+		len(pdo.StateSpecs) == 0 &&
+		len(pdo.TimerFamilySpecs) == 0 &&
+		pdo.RestrictionCoderId == "" {
+		// At their simplest, we don't need to do anything special at pre-processing time, and simply pass through as normal.
+		return &pipepb.Components{
+			Transforms: map[string]*pipepb.PTransform{
+				tid: t,
+			},
+		}, nil
+	}
+
+	// Side inputs add to topology and make fusion harder to deal with

Review Comment:
   You're right, I appreciate these comments!



##########
sdks/go/pkg/beam/runners/prism/internal/handlepardo.go:
##########
@@ -0,0 +1,242 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package internal
+
+import (
+	"fmt"
+	"reflect"
+
+	pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/urns"
+	"golang.org/x/exp/maps"
+	"google.golang.org/protobuf/proto"
+)
+
+// This file retains the logic for the pardo handler
+
+// ParDoCharacteristic holds the configuration for ParDos.
+type ParDoCharacteristic struct {
+	DisableSDF bool // Sets whether a pardo supports SDFs or not.
+}
+
+func ParDo(config any) *pardo {
+	return &pardo{config: config.(ParDoCharacteristic)}
+}
+
+// pardo represents an instance of the pardo handler.
+type pardo struct {
+	config ParDoCharacteristic
+}
+
+// ConfigURN returns the name for combine in the configuration file.
+func (*pardo) ConfigURN() string {
+	return "pardo"
+}
+
+func (*pardo) ConfigCharacteristic() reflect.Type {
+	return reflect.TypeOf((*ParDoCharacteristic)(nil)).Elem()
+}
+
+var _ transformPreparer = (*pardo)(nil)
+
+func (*pardo) PrepareUrns() []string {
+	return []string{urns.TransformParDo}
+}
+
+// PrepareTransform handles special processing with respect to ParDos, since their handling is dependant on supported features
+// and requirements.
+func (h *pardo) PrepareTransform(tid string, t *pipepb.PTransform, comps *pipepb.Components) (*pipepb.Components, []string) {
+
+	// ParDos are a pain in the butt.
+	// Combines, by comparison, are dramatically simpler.
+	// This is because for ParDos, how they are handled, and what kinds of transforms are in
+	// and around the ParDo, the actual shape of the graph will change.
+	// At their simplest, it's something a DoFn will handle on their own.
+	// At their most complex, they require intimate interaction with the subgraph
+	// bundling process, the data layer, state layers, and control layers.
+	// But unlike combines, which have a clear urn for composite + special payload,
+	// ParDos have the standard URN for composites with the standard payload.
+	// So always, we need to first unmarshal the payload.
+
+	pardoPayload := t.GetSpec().GetPayload()
+	pdo := &pipepb.ParDoPayload{}
+	if err := (proto.UnmarshalOptions{}).Unmarshal(pardoPayload, pdo); err != nil {
+		panic(fmt.Sprintf("unable to decode ParDoPayload for transform[%v]", t.GetUniqueName()))
+	}
+
+	// Lets check for and remove anything that makes things less simple.
+	if pdo.OnWindowExpirationTimerFamilySpec == "" &&
+		!pdo.RequestsFinalization &&
+		!pdo.RequiresStableInput &&
+		!pdo.RequiresTimeSortedInput &&
+		//	len(pdo.SideInputs) == 0 &&

Review Comment:
   Should this still be here?



##########
sdks/go/pkg/beam/runners/prism/internal/handlerunner.go:
##########
@@ -0,0 +1,298 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package internal
+
+import (
+	"bytes"
+	"fmt"
+	"io"
+	"reflect"
+	"sort"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+	pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/engine"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/urns"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/worker"
+	"golang.org/x/exp/slog"
+	"google.golang.org/protobuf/encoding/prototext"
+	"google.golang.org/protobuf/proto"
+)
+
+// This file retains the logic for the pardo handler
+
+// RunnerCharacteristic holds the configuration for Runner based transforms,
+// such as GBKs, Flattens.
+type RunnerCharacteristic struct {
+	SDKFlatten bool // Sets whether we should force an SDK side flatten.
+	SDKGBK     bool // Sets whether the GBK should be handled by the SDK, if possible by the SDK.
+}
+
+func Runner(config any) *runner {

Review Comment:
   The `Combine()`, `ParDo()` and `Runner()` functions are exported with an unexported return type, but you might have an intention with this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lostluck commented on a diff in pull request #25558: [#24789][prism] Handlers for combine, ParDo, GBK, Flatten

Posted by "lostluck (via GitHub)" <gi...@apache.org>.
lostluck commented on code in PR #25558:
URL: https://github.com/apache/beam/pull/25558#discussion_r1112177342


##########
sdks/go/pkg/beam/runners/prism/internal/handlepardo.go:
##########
@@ -0,0 +1,242 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package internal
+
+import (
+	"fmt"
+	"reflect"
+
+	pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/urns"
+	"golang.org/x/exp/maps"
+	"google.golang.org/protobuf/proto"
+)
+
+// This file retains the logic for the pardo handler
+
+// ParDoCharacteristic holds the configuration for ParDos.
+type ParDoCharacteristic struct {
+	DisableSDF bool // Sets whether a pardo supports SDFs or not.
+}
+
+func ParDo(config any) *pardo {
+	return &pardo{config: config.(ParDoCharacteristic)}
+}
+
+// pardo represents an instance of the pardo handler.
+type pardo struct {
+	config ParDoCharacteristic
+}
+
+// ConfigURN returns the name for combine in the configuration file.
+func (*pardo) ConfigURN() string {
+	return "pardo"
+}
+
+func (*pardo) ConfigCharacteristic() reflect.Type {
+	return reflect.TypeOf((*ParDoCharacteristic)(nil)).Elem()
+}
+
+var _ transformPreparer = (*pardo)(nil)
+
+func (*pardo) PrepareUrns() []string {
+	return []string{urns.TransformParDo}
+}
+
+// PrepareTransform handles special processing with respect to ParDos, since their handling is dependant on supported features
+// and requirements.
+func (h *pardo) PrepareTransform(tid string, t *pipepb.PTransform, comps *pipepb.Components) (*pipepb.Components, []string) {
+
+	// ParDos are a pain in the butt.
+	// Combines, by comparison, are dramatically simpler.
+	// This is because for ParDos, how they are handled, and what kinds of transforms are in
+	// and around the ParDo, the actual shape of the graph will change.
+	// At their simplest, it's something a DoFn will handle on their own.
+	// At their most complex, they require intimate interaction with the subgraph
+	// bundling process, the data layer, state layers, and control layers.
+	// But unlike combines, which have a clear urn for composite + special payload,
+	// ParDos have the standard URN for composites with the standard payload.
+	// So always, we need to first unmarshal the payload.
+
+	pardoPayload := t.GetSpec().GetPayload()
+	pdo := &pipepb.ParDoPayload{}
+	if err := (proto.UnmarshalOptions{}).Unmarshal(pardoPayload, pdo); err != nil {
+		panic(fmt.Sprintf("unable to decode ParDoPayload for transform[%v]", t.GetUniqueName()))
+	}
+
+	// Lets check for and remove anything that makes things less simple.
+	if pdo.OnWindowExpirationTimerFamilySpec == "" &&
+		!pdo.RequestsFinalization &&
+		!pdo.RequiresStableInput &&
+		!pdo.RequiresTimeSortedInput &&
+		//	len(pdo.SideInputs) == 0 &&

Review Comment:
   No. I have a comment elsewhere about side inputs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org