You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Leonardo Reis <le...@arquivei.com.br> on 2023/06/06 20:43:25 UTC

[Go SDK] Error transforming byte array message from kafka inside ParDo

Hi everyone.
I'm trying to implement a streaming pipeline with gosdk inside Dataflow
runner, using kafkaio from the xlang package to read a message from one
topic, transform this message and publish it in another kafka topic.

My message is a bytearray (avro message encoded in wireformat), if I just
read this message from one topic and publish it in another topic, this
works perfectly, but if I try to transform this message inside a ParDo, or
just send this message for a LogFn, I receive the following error message:

*"Error message from worker: generic::unknown: process bundle failed for
> instruction process_bundle-2-2 using plan S02-54 : while executing Process
> for Plan[S02-54]: 2: Discard 3: PCollection[S02-54-pcollection-61] Out:[2]
> 4: ParDo[debug.printKVFn] Out:[3] Sig: func(context.Context, typex.X,
> typex.Y) (typex.X, typex.Y) 1: DataSource[S[ptransform-53@localhost:12371],
> 0] Out:4
> Coder:W;coder-67<KV;coder-68<N;coder-69<bytes;npbayBrypnByteArrayCoder>,N;coder-70<bytes;npbayBrypnByteArrayCoder>>>!GWC
> caused by: source failed caused by: source decode failed caused by: error
> decoding bool: received invalid value [3]"*


 The main functions of my pipeline is declared in this way:

pipeline := beam.NewPipeline()
s := pipeline.Root()
kafkaConsumerOpts := kafkaio.ConsumerConfigs(
map[string]string{
"client.dns.lookup": "resolve_canonical_bootstrap_servers_only",
"sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule
required username=\"engenharia\" password=\"q1w2e3r4\";",
"security.protocol": "SASL_PLAINTEXT",
"sasl.mechanism": "PLAIN",
"group.id": "pocbeamgo",
"enable.auto.commit": "false",
"auto.offset.reset": "earliest",
},
)

read := kafkaio.Read(s, config.ExpansionAddr, config.BootstrapServers, []
string{config.InputTopic}, kafkaConsumerOpts)
beam.ParDo0(s, &LogFn{}, read)

and the LogFn
func init() {
register.DoFn1x0[[]byte](&LogFn{})
}

// LogFn is a DoFn to log rides.
type LogFn struct{}

// ProcessElement logs each element it receives.
func (fn *LogFn) ProcessElement(elm []byte) {
log.Infof(context.Background(), "Ride info: %v", elm)
}

// FinishBundle waits a bit so the job server finishes receiving logs.
func (fn *LogFn) FinishBundle() {
time.Sleep(2 * time.Second)
}

Could anyone help me with this error? Thanks a lot.

Leonardo Reis.