You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Iain Cundy <Ia...@amdocs.com> on 2018/01/25 12:11:41 UTC

Kafka deserialization to Structured Streaming SQL - Encoders.bean result doesn't match itself?

Hi All

I'm trying to move from MapWithState to Structured Streaming v2.2.1, but I've run into a problem. 

To convert from Kafka data with a binary (protobuf) value to SQL I'm taking the dataset from readStream and doing 

Dataset<Row> s = dataset.selectExpr("timestamp", "CAST(key as string)", "ETBinnedDeserialize(value) AS message");

ETBinnedDeserialize is a UDF

spark.udf().register("ETBinnedDeserialize",
        (UDF1<byte[], Object>) ETProtobufDecoder::deserialize, Encoders.bean(BinnedForET.class).schema());

ETProtobufDecoder::deserialize looks like this

public static Object deserialize(byte[] bytes) {
    ExpressionEncoder<BinnedForET> expressionEncoder = (ExpressionEncoder<BinnedForET>) Encoders.bean(BinnedForET.class);
    BinnedForET binned = .... // Convert binary to pojo
    InternalRow row = expressionEncoder.toRow(binned);
    return row;
}

The key point from all this is that the schema for message is from Encoders.bean(BinnedForET.class) and the object the UDF returns is the result of the same Encoders toRow method.
Yet there is a scala mismatch. So if not toRow what should I be calling?

Here is the error 
org.apache.spark.SparkException: Failed to execute user defined function($anonfun$27: (binary) => struct<day:int,hour:int,measCount:int,minute:int,month:int,servingSectorHandle:int,year:int>)
....
Caused by: scala.MatchError: [0,16,c,0,0,1,576c,7e2] (of class org.apache.spark.sql.catalyst.expressions.UnsafeRow)
        at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:236)

I've reduced the class to 7 ints, to make it as simple as possible. So no Strings that make the scala struct more complicated. 

The scala object above seems to be a struct with an initial zero (a null indicator?) followed by the 7 ints I expect in hex, but doesn't match? Maybe it's obviously wrong to a scala programmer?

Any ideas what I should be calling instead of (or after?) toRow to return the right thing?

Cheers
Iain 

This message and the information contained herein is proprietary and confidential and subject to the Amdocs policy statement,

you may review at https://www.amdocs.com/about/email-disclaimer <https://www.amdocs.com/about/email-disclaimer>


---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org