You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Iain Cundy <Ia...@amdocs.com> on 2018/01/25 12:11:41 UTC
Kafka deserialization to Structured Streaming SQL - Encoders.bean
result doesn't match itself?
Hi All
I'm trying to move from MapWithState to Structured Streaming v2.2.1, but I've run into a problem.
To convert from Kafka data with a binary (protobuf) value to SQL I'm taking the dataset from readStream and doing
Dataset<Row> s = dataset.selectExpr("timestamp", "CAST(key as string)", "ETBinnedDeserialize(value) AS message");
ETBinnedDeserialize is a UDF
spark.udf().register("ETBinnedDeserialize",
(UDF1<byte[], Object>) ETProtobufDecoder::deserialize, Encoders.bean(BinnedForET.class).schema());
ETProtobufDecoder::deserialize looks like this
public static Object deserialize(byte[] bytes) {
ExpressionEncoder<BinnedForET> expressionEncoder = (ExpressionEncoder<BinnedForET>) Encoders.bean(BinnedForET.class);
BinnedForET binned = .... // Convert binary to pojo
InternalRow row = expressionEncoder.toRow(binned);
return row;
}
The key point from all this is that the schema for message is from Encoders.bean(BinnedForET.class) and the object the UDF returns is the result of the same Encoders toRow method.
Yet there is a scala mismatch. So if not toRow what should I be calling?
Here is the error
org.apache.spark.SparkException: Failed to execute user defined function($anonfun$27: (binary) => struct<day:int,hour:int,measCount:int,minute:int,month:int,servingSectorHandle:int,year:int>)
....
Caused by: scala.MatchError: [0,16,c,0,0,1,576c,7e2] (of class org.apache.spark.sql.catalyst.expressions.UnsafeRow)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:236)
I've reduced the class to 7 ints, to make it as simple as possible. So no Strings that make the scala struct more complicated.
The scala object above seems to be a struct with an initial zero (a null indicator?) followed by the 7 ints I expect in hex, but doesn't match? Maybe it's obviously wrong to a scala programmer?
Any ideas what I should be calling instead of (or after?) toRow to return the right thing?
Cheers
Iain
This message and the information contained herein is proprietary and confidential and subject to the Amdocs policy statement,
you may review at https://www.amdocs.com/about/email-disclaimer <https://www.amdocs.com/about/email-disclaimer>
---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org