You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (Jira)" <ji...@apache.org> on 2019/09/08 01:49:00 UTC
[jira] [Resolved] (SPARK-28999) Exception in thread "main"
org.apache.spark.sql.AnalysisException: Multiple mapGroupsWithStates are
not supported on a streaming DataFrames/Datasets;;
[ https://issues.apache.org/jira/browse/SPARK-28999?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Hyukjin Kwon resolved SPARK-28999.
----------------------------------
Resolution: Invalid
Please ask questions into mailing lists. See https://spark.apache.org/community.html
> Exception in thread "main" org.apache.spark.sql.AnalysisException: Multiple mapGroupsWithStates are not supported on a streaming DataFrames/Datasets;;
> ------------------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: SPARK-28999
> URL: https://issues.apache.org/jira/browse/SPARK-28999
> Project: Spark
> Issue Type: Question
> Components: Spark Core
> Affects Versions: 2.3.0
> Reporter: ruiliang
> Priority: Major
> Attachments: 微信图片_20190906170459.png
>
>
> Exception in thread "main" org.apache.spark.sql.AnalysisException: Multiple mapGroupsWithStates are not supported on a streaming DataFrames/Datasets;;
>
> How do you carry out a double digit operation?The first processing state and the second summarizing the processed state data?Is there any good way not to sink->kafka, kafka source->spark structured like this?thank you
> {code:java}
> //代码占位符
> package org.roy.demo.streaming.bus
> import java.sql.Timestamp
> import org.apache.log4j.{Level, Logger}
> import org.apache.spark.sql.SparkSession
> import org.apache.spark.sql.functions.current_timestamp
> import org.apache.spark.sql.streaming._
> import streaming.StreamingExamples
> object StructuredOrderStateListRturn {
> Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
> StreamingExamples.setStreamingLogLevels()
> def main(args: Array[String]): Unit = {
> val spark = SparkSession
> .builder.master("local[*]")
> .appName("StructuredSessionization")
> .getOrCreate()
> import spark.implicits._
> // Create DataFrame representing the stream of input lines from connection to host:port
> val lines = spark.readStream
> .format("socket")
> .option("host", "10.200.102.192")
> .option("port", 9998)
> .load().withColumn("current_timestamp", current_timestamp)
> //100,1,10,20,2019-09-03
> //1001,1,10,200,2019-09-03
> //1001,1,10,2000,2019-09-03
> val events = lines
> .as[(String, Timestamp)]
> .map { case (line, timestamp) => {
> val orderInfo = line.split(",")
> if (orderInfo != null && orderInfo.size > 4) {
> val objEvent = NOEvent(orderInfo(0), orderInfo(1).toInt, orderInfo(2), orderInfo(3).toDouble, orderInfo(4), timestamp)
> objEvent
> } else {
> null
> }
> }
> }.filter(obj => obj != null)
> /**
> * -次维护订单状态数据,返回最新的一个订单,并把上一个订单金额附加返回
> */
> val orderUpdates = events
> .groupByKey(event => event.orderId)
> //orderInfoStore=输入的状态类型,orderInfoStoreUpdate=输出的状态类型
> .mapGroupsWithState[orderInfoStore, orderInfoStoreUpdate](GroupStateTimeout.ProcessingTimeTimeout) {
> case (orderId: String, events: Iterator[NOEvent], state: GroupState[orderInfoStore]) =>
> // 如果时间超时,更新缓存
> if (state.hasTimedOut) {
> //时间过了,删除sotre里的数据,把expired=true,返回在线时间等信息 0:00-0:10可以设这个时间,不计算,干掉数据
> val finalUpdate =
> orderInfoStoreUpdate(orderId, state.get.otype, state.get.storeId, state.get.money, 0.0, state.get.orderDate, state.get.timestamp, expired = true)
> state.remove()
> finalUpdate
> } else {
> //订单没有超时,如果id存在,则替换掉,使用新的订单数据,或作别的操作
> var oldOrder = 0.0 //上一笔的金额
> val lastEnvent = events.toSeq.last
> val updatedSession = if (state.exists) {
> oldOrder = state.get.money
> //存在,算出旧的金额是多少
> orderInfoStore(orderId, lastEnvent.otype, lastEnvent.storeId, lastEnvent.money, oldOrder, lastEnvent.orderDate, lastEnvent.timestamp)
> } else {
> orderInfoStore(orderId, lastEnvent.otype, lastEnvent.storeId, lastEnvent.money, 0, lastEnvent.orderDate, lastEnvent.timestamp)
> }
> //更新缓存里面的这条数据信息
> state.update(updatedSession)
> // Set timeout such that the session will be expired if no data received for 10 seconds
> state.setTimeoutDuration("3600 seconds")
> orderInfoStoreUpdate(orderId, state.get.otype, state.get.storeId, state.get.money, oldOrder, state.get.orderDate, state.get.timestamp, expired = false)
> }
> }
> /**
> * 二次计算出门店的数据
> */
> val storeUpdate = orderUpdates.groupByKey(order => order.storeId).mapGroupsWithState[g1InfoStore, g1InfoStoreUpdate](GroupStateTimeout.ProcessingTimeTimeout) {
> case (storeId: String, events: Iterator[orderInfoStoreUpdate], state: GroupState[g1InfoStore]) =>
> // 如果时间超时,更新缓存
> if (state.hasTimedOut) {
> //时间过了,删除sotre里的数据,把expired=true,返回在线时间等信息 0:00-0:10可以设这个时间,不计算,干掉数据
> val finalUpdate =
> g1InfoStoreUpdate(storeId, state.get.num, state.get.money, state.get.timestamp, expired = true)
> state.remove()
> finalUpdate
> } else {
> var storeNum = events.map(_.orderId).size
> var storeMoney = events.map(_.money).reduce(_ + _) //其实只会一个
> val updatedStore = if (state.exists) { //门店存在,
> val old_order_moneys = events.map(_.oldMoney).reduce(_ + _) //其实只会一个
> //门店总客+=新订单金额-旧订单
> storeMoney = state.get.money + storeMoney - old_order_moneys
> g1InfoStore(storeId, state.get.num + events.map(_.orderId).size, storeMoney, state.get.timestamp)
> } else {
> g1InfoStore(storeId, storeNum, storeMoney, state.get.timestamp)
> }
> //更新缓存里面的这条数据信息
> state.update(updatedStore)
> // Set timeout such that the session will be expired if no data received for 10 seconds
> state.setTimeoutDuration("3600 seconds")
> g1InfoStoreUpdate(storeId, state.get.num, state.get.money, state.get.timestamp, expired = true)
> }
> }
> //门店统计好的数据在汇总
> // storeUpdate.createOrReplaceTempView("update_tmp")
> // spark.sql("select storeId,otype, count(1) num,sum(money) as moneys ,sum(oldMoney) as oldMoney from update_tmp group by storeId,otype ")
> val query = storeUpdate
> .writeStream
> .outputMode("update")
> .format("console")
> .start()
> query.awaitTermination()
> }
> }
> /** User-defined data type representing the input events */
> case class NOEvent(orderId: String, otype: Int, storeId: String, money: Double, orderDate: String, timestamp: Timestamp)
> case class orderInfoStore(orderId: String, otype: Int, storeId: String, money: Double, oldMoney: Double, orderDate: String, timestamp: Timestamp)
> case class orderInfoStoreUpdate(orderId: String, otype: Int, storeId: String, money: Double, oldMoney: Double, orderDate: String, timestamp: Timestamp,
> expired: Boolean)
> /** 第一个分组信息 */
> case class g1InfoStore(storeid: String,/* otype: Int,*/ num: Int, money: Double, timestamp: Timestamp)
> case class g1InfoStoreUpdate(storeid: String,/* otype: Int,*/ num: Int, money: Double, timestamp: Timestamp, expired: Boolean)
> {code}
>
>
>
> I changed the way to maintain a list of orders in the store, and then to maintain the list and re-operate the calculation. However, in this way, I will maintain the details of the first-line data, and I will temporarily use many resources. I wonder if there is any other way to deal with this kind of data.
> {code:java}
> package org.roy.demo.streaming.bus
> import java.sql.Timestamp
> import org.apache.log4j.{Level, Logger}
> import org.apache.spark.sql.SparkSession
> import org.apache.spark.sql.functions.current_timestamp
> import org.apache.spark.sql.streaming._
> import streaming.StreamingExamples
> /**
> * create by Roy 2019/09/06
> * Counting day order number and amount
> */
> object StructuredStoreOrderState {
> Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
> StreamingExamples.setStreamingLogLevels()
> def main(args: Array[String]): Unit = {
> val spark = SparkSession
> .builder.master("local[*]")
> .appName("StructuredSessionization")
> .getOrCreate()
> import spark.implicits._
> // Create DataFrame representing the stream of input lines from connection to host:port
> val lines = spark.readStream
> .format("socket")
> .option("host", "10.200.102.192")
> .option("port", 9998)
> .load().withColumn("current_timestamp", current_timestamp)
> //100,1,10,20,2019-09-03
> //1001,1,10,200,2019-09-03
> //1001,1,10,2000,2019-09-03
> val events = lines
> .as[(String, Timestamp)]
> .map { case (line, timestamp) => {
> val orderInfo = line.split(",")
> if (orderInfo != null && orderInfo.size > 4) {
> val objEvent = dataEvent(orderInfo(0), orderInfo(1).toInt, orderInfo(2), orderInfo(3).toDouble, orderInfo(4), timestamp)
> objEvent
> } else {
> null
> }
> }
> }.filter(obj => obj != null)
> val orderUpdates = events
> .groupByKey(event => event.storeId)
> .mapGroupsWithState[storeOrderInfoState, storeOrderInfoStateUpdate](GroupStateTimeout.ProcessingTimeTimeout) {
> case (key: String, values: Iterator[dataEvent], state: GroupState[storeOrderInfoState]) =>
> val seqs = values.toSeq
> val times = seqs.map(_.timestamp).seq
> val max_time = new Timestamp(System.currentTimeMillis())
> if (state.hasTimedOut) {
> val finalUpdate =
> storeOrderInfoStateUpdate(key, state.get.orderNum, state.get.orderMoney, max_time, expired = true)
> state.remove()
> finalUpdate
> } else {
> val updatedSession = if (state.exists) {
> val stateMap = state.get.orderInfoStoreMap
> var norderMap2: Map[String, Double] = Map()
> var num = 0
> var money = 0.0
> seqs.foreach(e => {
> if (stateMap.contains(e.orderId)) {
> //新订单-旧订单,再进行+总合
> money += e.money - stateMap.get(e.orderId).get
> } else {
> num += 1
> money += e.money
> }
> norderMap2 += (e.orderId -> e.money)
> })
> //取出所有的订单+流进来的订单,需要判断是否有重复订单
> storeOrderInfoState(key, state.get.orderNum + num, state.get.orderMoney + money, stateMap ++ norderMap2, max_time)
> } else {
> var norderMap2: Map[String, Double] = Map()
> var money = 0.0
> seqs.foreach(e => {
> money += e.money
> norderMap2 += (e.orderId -> e.money)
> })
> storeOrderInfoState(key, norderMap2.size, money, norderMap2, max_time)
> }
> //更新缓存里面的这条数据信息
> println("updatedSession" + updatedSession)
> println(updatedSession.orderMoney)
> state.update(updatedSession)
> // Set timeout such that the session will be expired if no data received for 10 seconds
> state.setTimeoutDuration("3600 seconds")
> storeOrderInfoStateUpdate(key, state.get.orderNum, state.get.orderMoney, max_time, expired = false)
> }
> }
> val query = orderUpdates
> .writeStream
> .outputMode("update")
> .format("console")
> .start()
> query.awaitTermination()
> }
> }
> /** User-defined data type representing the input events */
> case class dataEvent(orderId: String, otype: Int, storeId: String, money: Double, orderTime: String, timestamp: Timestamp)
> //最一个订单状态
> case class orderEnventInfo(orderId: String, otype: Int, storeId: String, money: Double, orderTime: String, timestamp: Timestamp)
> //门店里面,维护一张所有订单,sotre orderInfoStoreMap: Map[String, orderEnventInfo] Seq[orderEnventInfo]
> case class storeOrderInfoState(storeId: String, orderNum: Int, orderMoney: Double, orderInfoStoreMap: Map[String, Double], timestamp: Timestamp)
> //返回计算后的结果
> case class storeOrderInfoStateUpdate(storeId: String, orderNum: Int, orderMoney: Double, timestamp: Timestamp,
> expired: Boolean)
> {code}
> The final output I need is this
> !微信图片_20190906170459.png!
--
This message was sent by Atlassian Jira
(v8.3.2#803003)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org