You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Alexandre Thenorio (JIRA)" <ji...@apache.org> on 2019/04/12 08:13:00 UTC

[jira] [Created] (BEAM-7065) Unable to use combine functions

Alexandre Thenorio created BEAM-7065:
----------------------------------------

             Summary: Unable to use combine functions
                 Key: BEAM-7065
                 URL: https://issues.apache.org/jira/browse/BEAM-7065
             Project: Beam
          Issue Type: Bug
          Components: sdk-go
    Affects Versions: 2.11.0
         Environment: Google Cloud Dataflow
            Reporter: Alexandre Thenorio


I have tried running a simple example to calculate a running average or sum using the `stats` package however it does not seems to work.

 

Here's a reproducer

 
{code:java}
package main


import (
    "context"
    "encoding/json"
    "flag"
    "fmt"
    "time"


    "cloud.google.com/go/pubsub"
    "github.com/apache/beam/sdks/go/pkg/beam"
    "github.com/apache/beam/sdks/go/pkg/beam/io/pubsubio"
    "github.com/apache/beam/sdks/go/pkg/beam/log"
    "github.com/apache/beam/sdks/go/pkg/beam/options/gcpopts"
    "github.com/apache/beam/sdks/go/pkg/beam/transforms/stats"
    "github.com/apache/beam/sdks/go/pkg/beam/util/pubsubx"
    "github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
    "github.com/apache/beam/sdks/go/pkg/beam/x/debug"
)


var (
    input = flag.String("input", "iot-data", "Pubsub input topic.")
)


type sensor struct {
    name  string
    value int
}


var (
    data = []sensor{
        {name: "temperature", value: 24},
        {name: "humidity", value: 10},
        {name: "temperature", value: 20},
        {name: "temperature", value: 22},
        {name: "humidity", value: 14},
        {name: "humidity", value: 18},
    }
)


func main() {
    flag.Parse()
    beam.Init()


    ctx := context.Background()
    project := gcpopts.GetProject(ctx)


    log.Infof(ctx, "Publishing %v messages to: %v", len(data), *input)


    defer pubsubx.CleanupTopic(ctx, project, *input)
    sub, err := Publish(ctx, project, *input, data...)
    if err != nil {
        log.Fatal(ctx, err)
    }


    log.Infof(ctx, "Running streaming sensor data with subscription: %v", sub.ID())


    p := beam.NewPipeline()
    s := p.Root()


    // Reads sensor data from pubsub
    // Returns PCollection<[]byte>
    col := pubsubio.Read(s, project, *input, &pubsubio.ReadOptions{Subscription: sub.ID()})


    // Transforms incoming bytes from pubsub to a string,int key value
    // where the key is the sensor name and the value is the sensor reading
    // Accepts PCollection<[]byte>
    // Returns PCollection<KV<string,int>>
    data := beam.ParDo(s, extractSensorData, col)


    // Calculate running average per sensor
    //
    // Accpets PCollection<KV<string,int>>
    // Returns PCollection<KV<string,int>>
    sum := stats.MeanPerKey(s, data)
    debug.Print(s, sum)


    if err := beamx.Run(context.Background(), p); err != nil {
        log.Exitf(ctx, "Failed to execute job: %v", err)
    }
}


func extractSensorData(msg []byte) (string, int) {


    ctx := context.Background()


    data := &sensor{}
    if err := json.Unmarshal(msg, data); err != nil {
        log.Fatal(ctx, err)
    }


    return data.name, data.value
}


func Publish(ctx context.Context, project, topic string, messages ...sensor) (*pubsub.Subscription, error) {
    client, err := pubsub.NewClient(ctx, project)
    if err != nil {
        return nil, err
    }
    t, err := pubsubx.EnsureTopic(ctx, client, topic)
    if err != nil {
        return nil, err
    }
    sub, err := pubsubx.EnsureSubscription(ctx, client, topic, fmt.Sprintf("%v.sub.%v", topic, time.Now().Unix()))
    if err != nil {
        return nil, err
    }


    for _, msg := range messages {
        s := &sensor{}
        bytes, err := json.Marshal(s)
        if err != nil {
            return nil, fmt.Errorf("failed to unmarshal '%v': %v", msg, err)
        }
        m := &pubsub.Message{
            Data: ([]byte)(bytes),
            // Attributes: ??
        }
        id, err := t.Publish(ctx, m).Get(ctx)
        if err != nil {
            return nil, fmt.Errorf("failed to publish '%v': %v", msg, err)
        }
        log.Infof(ctx, "Published %v with id: %v", msg, id)
    }
    return sub, nil
}


{code}
 

I ran this code in the following way

 
{noformat}
go run . --project="<my-project>" --runner dataflow  --staging_location gs://<my-gs-bucket>/binaries/ --temp_location gs://<my-gs-bucket>/tmp/ --region "europe-west1" --worker_harness_container_image=alethenorio/beam-go:v2.11.0{noformat}
 

 

 

The code published to pubsub and then reads the messages and attempts to call `stats.MeanPerKey` to create a running average.

 

When deploying this on cloud dataflow, using a container I built myself from the v2.11.0 version (alethenorio/beam-go:v2.11.0) I get the following error every time

 

 
{noformat}
Worker panic: Unexpected transform URN: beam:transform:combine_grouped_values:v1goroutine 1 [running]:
runtime/debug.Stack(0x50, 0x0, 0x0)
/usr/local/go/src/runtime/debug/stack.go:24 +0x9d
runtime/debug.PrintStack()
/usr/local/go/src/runtime/debug/stack.go:16 +0x22
github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/init.hook.func1()
/home/localuser/go/pkg/mod/github.com/apache/beam@v2.11.0+incompatible/sdks/go/pkg/beam/core/runtime/harness/init/init.go:77 +0xac
panic(0xd4b160, 0xc001172f70)
/usr/local/go/src/runtime/panic.go:522 +0x1b5
github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.(*builder).makeLink(0xc00078b980, 0xc0007aa2c0, 0x18, 0xc0007aa180, 0x17, 0x0, 0x400000000000040, 0xffffffffffffffff, 0x0, 0x0)
/home/localuser/go/pkg/mod/github.com/apache/beam@v2.11.0+incompatible/sdks/go/pkg/beam/core/runtime/exec/translate.go:521 +0x308f
github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.(*builder).makePCollection(0xc00078b980, 0xc0007aa2c0, 0x18, 0xc0011775a0, 0xf, 0x0, 0x0)
/home/localuser/go/pkg/mod/github.com/apache/beam@v2.11.0+incompatible/sdks/go/pkg/beam/core/runtime/exec/translate.go:281 +0x5ff
github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.UnmarshalPlan(0xc00078b2c0, 0xc00031a0f0, 0xd49820, 0xff5e10)
/home/localuser/go/pkg/mod/github.com/apache/beam@v2.11.0+incompatible/sdks/go/pkg/beam/core/runtime/exec/translate.go:71 +0x393
github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness.(*control).handleInstruction(0xc000304300, 0x1026be0, 0xc00031a0f0, 0xc000774880, 0xc00031a0f0)
/home/localuser/go/pkg/mod/github.com/apache/beam@v2.11.0+incompatible/sdks/go/pkg/beam/core/runtime/harness/harness.go:155 +0x1ae
github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness.Main.func2(0x1026be0, 0xc00031a0f0, 0xc000774880)
/home/localuser/go/pkg/mod/github.com/apache/beam@v2.11.0+incompatible/sdks/go/pkg/beam/core/runtime/harness/harness.go:114 +0x1cf
github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness.Main(0x1026be0, 0xc00031a0f0, 0x7ffe0437db7c, 0xf, 0x7ffe0437db9f, 0xf, 0x0, 0x0)
/home/localuser/go/pkg/mod/github.com/apache/beam@v2.11.0+incompatible/sdks/go/pkg/beam/core/runtime/harness/harness.go:129 +0x786
github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/init.hook()
/home/localuser/go/pkg/mod/github.com/apache/beam@v2.11.0+incompatible/sdks/go/pkg/beam/core/runtime/harness/init/init.go:86 +0xee
github.com/apache/beam/sdks/go/pkg/beam/core/runtime.Init()
/home/localuser/go/pkg/mod/github.com/apache/beam@v2.11.0+incompatible/sdks/go/pkg/beam/core/runtime/init.go:42 +0x50
github.com/apache/beam/sdks/go/pkg/beam.Init(...)
/home/localuser/go/pkg/mod/github.com/apache/beam@v2.11.0+incompatible/sdks/go/pkg/beam/forward.go:111
main.main()
/home/localuser/reproducer/main.go:43 +0x8f
{noformat}
 

I realize the Go SDK is not stable yet but I was uncertain where to go to post this issue in case the devs are not aware so I hope this is fine.

I get the feeling there is some issue with the gRPC requests sending the wrong URN but I couldn't find where in the code the `v1goroutine` gets set (I think it needs to be just v1)

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)