You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Kuttaiah Robin <ku...@gmail.com> on 2018/09/18 04:12:34 UTC
Spark FlatMapGroupsWithStateFunction throws cannot resolve
'named_struct()' due to data type mismatch 'SerializeFromObject"
Hello,
Am using FlatMapGroupsWithStateFunction in my spark streaming application.
FlatMapGroupsWithStateFunction<String, Row, SessionInfo, SessionUpdate>
idstateUpdateFunction =
new FlatMapGroupsWithStateFunction<String, Row, SessionInfo,
SessionUpdate>() {.....}
SessionUpdate class is having trouble when added the highlighted code which
throws below exception; The same attribute milestones with setter/getter
has been added to SessionInfo (input class) but it does not throw
exception there.
public static class SessionUpdate implements Serializable {
private static final long serialVersionUID = -3858977319192658483L;
*private ArrayList<GenericRowWithSchema> milestones = new
ArrayList<GenericRowWithSchema>();*
private Timestamp processingTimeoutTimestamp;
public SessionUpdate() {
super();
}
public SessionUpdate(String instanceId, *ArrayList<GenericRowWithSchema>
milestones*, Timestamp processingTimeoutTimestamp) {
super();
this.instanceId = instanceId;
*this.milestones = milestones;*
this.processingTimeoutTimestamp = processingTimeoutTimestamp;
}
public String getInstanceId() {
return instanceId;
}
public void setInstanceId(String instanceId) {
this.instanceId = instanceId;
}
*public ArrayList<GenericRowWithSchema> getMilestones() {*
* return milestones;*
*}*
*public void setMilestones(ArrayList<GenericRowWithSchema> milestones) {*
* this.milestones = milestones;*
*}*
public Timestamp getProcessingTimeoutTimestamp() {
return processingTimeoutTimestamp;
}
public void setProcessingTimeoutTimestamp(Timestamp
processingTimeoutTimestamp) {
this.processingTimeoutTimestamp = processingTimeoutTimestamp;
}
}
Exception:
ERROR cannot resolve 'named_struct()' due to data type mismatch: input to
function named_struct requires at least one argument;;
'SerializeFromObject [staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
assertnotnull(input[0,
oracle.insight.spark.event_processor.EventProcessor$SessionUpdate,
true]).getInstanceId, true, false) AS instanceId#62,
mapobjects(MapObjects_loopValue2, MapObjects_loopIsNull2, ObjectType(class
org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema), if
(isnull(lambdavariable(MapObjects_loopValue2, MapObjects_loopIsNull2,
ObjectType(class
org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema), true)))
null else named_struct(), assertnotnull(input[0,
oracle.insight.spark.event_processor.EventProcessor$SessionUpdate,
true]).getMilestones, None) AS milestones#63, staticinvoke(class
org.apache.spark.sql.catalyst.util.DateTimeUtils$, TimestampType,
fromJavaTimestamp, assertnotnull(input[0,
oracle.insight.spark.event_processor.EventProcessor$SessionUpdate,
true]).getProcessingTimeoutTimestamp, true, false) AS
processingTimeoutTimestamp#64]
+- FlatMapGroupsWithState <function3>, cast(value#54 as string).toString,
createexternalrow(EventTime#23.toString, InstanceID#24.toString,
Model#25.toString, Milestone#26.toString, Region#27.toString,
SalesOrganization#28.toString, ProductName#29.toString,
ReasonForQuoteReject#30.toString, ReasonforRejectionBy#31.toString,
OpportunityAmount#32.toJavaBigDecimal, Discount#33.toJavaBigDecimal,
TotalQuoteAmount#34.toJavaBigDecimal, NetQuoteAmount#35.toJavaBigDecimal,
ApprovedDiscount#36.toJavaBigDecimal, TotalOrderAmount#37.toJavaBigDecimal,
StructField(EventTime,StringType,true),
StructField(InstanceID,StringType,true),
StructField(Model,StringType,true), StructField(Milestone,StringType,true),
StructField(Region,StringType,true),
StructField(SalesOrganization,StringType,true),
StructField(ProductName,StringType,true),
StructField(ReasonForQuoteReject,StringType,true),
StructField(ReasonforRejectionBy,StringType,true), ... 6 more fields),
[value#54], [EventTime#23, InstanceID#24, Model#25, Milestone#26,
Region#27, SalesOrganization#28, ProductName#29, ReasonForQuoteReject#30,
ReasonforRejectionBy#31, OpportunityAmount#32, Discount#33,
TotalQuoteAmount#34, NetQuoteAmount#35, ApprovedDiscount#36,
TotalOrderAmount#37], obj#61:
oracle.insight.spark.event_processor.EventProcessor$SessionUpdate,
class[instanceId[0]: string, milestones[0]: array<struct<>>,
processingTimeoutTimestamp[0]: timestamp], Append, false,
ProcessingTimeTimeout
Schema looks like
{"Name":"EventTime", "DataType":"TimestampType"},
{"Name":"InstanceID", "DataType":"STRING", "Length":100},
{"Name":"Model", "DataType":"STRING", "Length":100},
{"Name":"Milestone", "DataType":"STRING", "Length":100},
{"Name":"Region", "DataType":"STRING", "Length":100},
{"Name":"SalesOrganization", "DataType":"STRING", "Length":100},
{"Name":"ProductName", "DataType":"STRING", "Length":100},
{"Name":"ReasonForQuoteReject", "DataType":"STRING", "Length":100},
{"Name":"ReasonforRejectionBy", "DataType":"STRING", "Length":100},
//Note: org.apache.spark.sql.types.DataTypes.createDecimalType(precision(),
scale())
{"Name":"OpportunityAmount", "DataType":"DECIMAL",
"Precision":38,"Scale":2},
{"Name":"Discount", "DataType":"DECIMAL",
"Precision":38,"Scale":2},
{"Name":"TotalQuoteAmount", "DataType":"DECIMAL",
"Precision":38,"Scale":2},
{"Name":"NetQuoteAmount", "DataType":"DECIMAL",
"Precision":38,"Scale":2},
{"Name":"ApprovedDiscount", "DataType":"DECIMAL",
"Precision":38,"Scale":2},
{"Name":"TotalOrderAmount", "DataType":"DECIMAL",
"Precision":38,"Scale":2}
Please let me know how to debug what is wrong in this use case?
thanks.
Robin Kuttaiah