You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "mzz (Jira)" <ji...@apache.org> on 2020/07/28 09:56:00 UTC
[jira] [Created] (FLINK-18741) ProcessWindowFunction's process
function exception
mzz created FLINK-18741:
---------------------------
Summary: ProcessWindowFunction's process function exception
Key: FLINK-18741
URL: https://issues.apache.org/jira/browse/FLINK-18741
Project: Flink
Issue Type: Bug
Components: API / DataStream
Affects Versions: 1.10.0
Reporter: mzz
I use ProcessWindowFunction to achieve PV calculation, but when rewriting process, the user-defined state value cannot be returned。
code:
{code:java}
tem.keyBy(x =>
(x._1, x._2, x._4, x._5, x._6, x._7, x._8))
.timeWindow(Time.seconds(15 * 60)) //15 min window
.process(new ProcessWindowFunction[(String, String, String, String, String, String, String, String, String), CkResult, (String, String, String, String, String, String, String), TimeWindow] {
var clickCount: ValueState[Long] = _
* var requestCount: ValueState[Long] = _
* var returnCount: ValueState[Long] = _
var videoCount: ValueState[Long] = _
var noVideoCount: ValueState[Long] = _
override def open(parameters: Configuration): Unit = {
clickCount = getRuntimeContext.getState(new ValueStateDescriptor("clickCount", classOf[Long]))
* requestCount = getRuntimeContext.getState(new ValueStateDescriptor("requestCount", classOf[Long]))*
returnCount = getRuntimeContext.getState(new ValueStateDescriptor("returnCount", classOf[Long]))
videoCount = getRuntimeContext.getState(new ValueStateDescriptor("videoCount", classOf[Long]))
noVideoCount = getRuntimeContext.getState(new ValueStateDescriptor("noVideoCount", classOf[Long]))
}
override def process(key: (String, String, String, String, String, String, String), context: Context, elements: Iterable[(String, String, String, String, String, String, String, String, String)], out: Collector[CkResult]) = {
try {
var clickNum: Long = clickCount.value
val dateNow = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMdd")).toLong
var requestNum: Long = requestCount.value
var returnNum: Long = returnCount.value
var videoNum: Long = videoCount.value
var noVideoNum: Long = noVideoCount.value
if (requestNum == null) {
requestNum = 0
}
val ecpm = key._7.toDouble.formatted("%.2f").toFloat
val created_at = getSecondTimestampTwo(new Date)
* elements.foreach(e => {
if ("adreq".equals(e._3)) {
requestNum += 1
println(key._1, requestNum)
}
})
requestCount.update(requestNum)
println(requestNum, key._1)*
out.collect(CkResult(dateNow, (created_at - getZero_time) / (60 * 15), key._2, key._3, key._4, key._5, key._3 + "_" + key._4 + "_" + key._5, key._6, key._1, requestCount.value, returnCount.value, fill_rate, noVideoCount.value + videoCount.value,
expose_rate, clickCount.value, click_rate, ecpm, (noVideoCount.value * ecpm + videoCount.value * ecpm / 1000.toFloat).formatted("%.2f").toFloat, created_at))
}
catch {
case e: Exception => println(key, e)
}
}
})
{code}
{code:java}
elements.foreach(e => {
if ("adreq".equals(e._3)) {
requestNum += 1
println(key._1, requestNum)
// The values printed here like :
//(key,1)
//(key,2)
//(key,3)
}
})
//But print outside the for loop always like :
//(key,0)
println(requestNum, key._1)
{code}
who can help me ,plz thx。
--
This message was sent by Atlassian Jira
(v8.3.4#803005)