You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (Jira)" <ji...@apache.org> on 2019/09/08 01:49:00 UTC

[jira] [Resolved] (SPARK-28999) Exception in thread "main" org.apache.spark.sql.AnalysisException: Multiple mapGroupsWithStates are not supported on a streaming DataFrames/Datasets;;

     [ https://issues.apache.org/jira/browse/SPARK-28999?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hyukjin Kwon resolved SPARK-28999.
----------------------------------
    Resolution: Invalid

Please ask questions into mailing lists. See https://spark.apache.org/community.html

> Exception in thread "main" org.apache.spark.sql.AnalysisException: Multiple mapGroupsWithStates are not supported on a streaming DataFrames/Datasets;;
> ------------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-28999
>                 URL: https://issues.apache.org/jira/browse/SPARK-28999
>             Project: Spark
>          Issue Type: Question
>          Components: Spark Core
>    Affects Versions: 2.3.0
>            Reporter: ruiliang
>            Priority: Major
>         Attachments: 微信图片_20190906170459.png
>
>
> Exception in thread "main" org.apache.spark.sql.AnalysisException: Multiple mapGroupsWithStates are not supported on a streaming DataFrames/Datasets;;
>  
> How do you carry out a double digit operation?The first processing state and the second summarizing the processed state data?Is there any good way not to sink->kafka, kafka source->spark structured like this?thank you
> {code:java}
> //代码占位符
> package org.roy.demo.streaming.bus
> import java.sql.Timestamp
> import org.apache.log4j.{Level, Logger}
> import org.apache.spark.sql.SparkSession
> import org.apache.spark.sql.functions.current_timestamp
> import org.apache.spark.sql.streaming._
> import streaming.StreamingExamples
> object StructuredOrderStateListRturn {
>   Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
>   StreamingExamples.setStreamingLogLevels()
>   def main(args: Array[String]): Unit = {
>     val spark = SparkSession
>       .builder.master("local[*]")
>       .appName("StructuredSessionization")
>       .getOrCreate()
>     import spark.implicits._
>     // Create DataFrame representing the stream of input lines from connection to host:port
>     val lines = spark.readStream
>       .format("socket")
>       .option("host", "10.200.102.192")
>       .option("port", 9998)
>       .load().withColumn("current_timestamp", current_timestamp)
>     //100,1,10,20,2019-09-03
>     //1001,1,10,200,2019-09-03
>     //1001,1,10,2000,2019-09-03
>     val events = lines
>       .as[(String, Timestamp)]
>       .map { case (line, timestamp) => {
>         val orderInfo = line.split(",")
>         if (orderInfo != null && orderInfo.size > 4) {
>           val objEvent = NOEvent(orderInfo(0), orderInfo(1).toInt, orderInfo(2), orderInfo(3).toDouble, orderInfo(4), timestamp)
>           objEvent
>         } else {
>           null
>         }
>       }
>       }.filter(obj => obj != null)
>     /**
>       * -次维护订单状态数据,返回最新的一个订单,并把上一个订单金额附加返回
>       */
>     val orderUpdates = events
>       .groupByKey(event => event.orderId)
>       //orderInfoStore=输入的状态类型,orderInfoStoreUpdate=输出的状态类型
>       .mapGroupsWithState[orderInfoStore, orderInfoStoreUpdate](GroupStateTimeout.ProcessingTimeTimeout) {
>       case (orderId: String, events: Iterator[NOEvent], state: GroupState[orderInfoStore]) =>
>         // 如果时间超时,更新缓存
>         if (state.hasTimedOut) {
>           //时间过了,删除sotre里的数据,把expired=true,返回在线时间等信息 0:00-0:10可以设这个时间,不计算,干掉数据
>           val finalUpdate =
>             orderInfoStoreUpdate(orderId, state.get.otype, state.get.storeId, state.get.money, 0.0, state.get.orderDate, state.get.timestamp, expired = true)
>           state.remove()
>           finalUpdate
>         } else {
>           //订单没有超时,如果id存在,则替换掉,使用新的订单数据,或作别的操作
>           var oldOrder = 0.0 //上一笔的金额
>           val lastEnvent = events.toSeq.last
>           val updatedSession = if (state.exists) {
>             oldOrder = state.get.money
>             //存在,算出旧的金额是多少
>             orderInfoStore(orderId, lastEnvent.otype, lastEnvent.storeId, lastEnvent.money, oldOrder, lastEnvent.orderDate, lastEnvent.timestamp)
>           } else {
>             orderInfoStore(orderId, lastEnvent.otype, lastEnvent.storeId, lastEnvent.money, 0, lastEnvent.orderDate, lastEnvent.timestamp)
>           }
>           //更新缓存里面的这条数据信息
>           state.update(updatedSession)
>           // Set timeout such that the session will be expired if no data received for 10 seconds
>           state.setTimeoutDuration("3600 seconds")
>           orderInfoStoreUpdate(orderId, state.get.otype, state.get.storeId, state.get.money, oldOrder, state.get.orderDate, state.get.timestamp, expired = false)
>         }
>     }
>     /**
>       * 二次计算出门店的数据
>       */
>     val storeUpdate = orderUpdates.groupByKey(order => order.storeId).mapGroupsWithState[g1InfoStore, g1InfoStoreUpdate](GroupStateTimeout.ProcessingTimeTimeout) {
>       case (storeId: String, events: Iterator[orderInfoStoreUpdate], state: GroupState[g1InfoStore]) =>
>         // 如果时间超时,更新缓存
>         if (state.hasTimedOut) {
>           //时间过了,删除sotre里的数据,把expired=true,返回在线时间等信息 0:00-0:10可以设这个时间,不计算,干掉数据
>           val finalUpdate =
>             g1InfoStoreUpdate(storeId, state.get.num, state.get.money, state.get.timestamp, expired = true)
>           state.remove()
>           finalUpdate
>         } else {
>           var storeNum = events.map(_.orderId).size
>           var storeMoney = events.map(_.money).reduce(_ + _) //其实只会一个
>           val updatedStore = if (state.exists) { //门店存在,
>             val old_order_moneys = events.map(_.oldMoney).reduce(_ + _) //其实只会一个
>             //门店总客+=新订单金额-旧订单
>             storeMoney = state.get.money + storeMoney - old_order_moneys
>             g1InfoStore(storeId, state.get.num + events.map(_.orderId).size, storeMoney, state.get.timestamp)
>           } else {
>             g1InfoStore(storeId, storeNum, storeMoney, state.get.timestamp)
>           }
>           //更新缓存里面的这条数据信息
>           state.update(updatedStore)
>           // Set timeout such that the session will be expired if no data received for 10 seconds
>           state.setTimeoutDuration("3600 seconds")
>           g1InfoStoreUpdate(storeId, state.get.num, state.get.money, state.get.timestamp, expired = true)
>         }
>     }
>     //门店统计好的数据在汇总
> //    storeUpdate.createOrReplaceTempView("update_tmp")
> //    spark.sql("select storeId,otype, count(1) num,sum(money) as moneys ,sum(oldMoney) as oldMoney  from update_tmp group by storeId,otype ")
>     val query = storeUpdate
>       .writeStream
>       .outputMode("update")
>       .format("console")
>       .start()
>     query.awaitTermination()
>   }
> }
> /** User-defined data type representing the input events */
> case class NOEvent(orderId: String, otype: Int, storeId: String, money: Double, orderDate: String, timestamp: Timestamp)
> case class orderInfoStore(orderId: String, otype: Int, storeId: String, money: Double, oldMoney: Double, orderDate: String, timestamp: Timestamp)
> case class orderInfoStoreUpdate(orderId: String, otype: Int, storeId: String, money: Double, oldMoney: Double, orderDate: String, timestamp: Timestamp,
>                                 expired: Boolean)
> /** 第一个分组信息 */
> case class g1InfoStore(storeid: String,/* otype: Int,*/ num: Int, money: Double, timestamp: Timestamp)
> case class g1InfoStoreUpdate(storeid: String,/* otype: Int,*/ num: Int, money: Double, timestamp: Timestamp, expired: Boolean)
> {code}
>  
>  
>  
> I changed the way to maintain a list of orders in the store, and then to maintain the list and re-operate the calculation. However, in this way, I will maintain the details of the first-line data, and I will temporarily use many resources. I wonder if there is any other way to deal with this kind of data.
> {code:java}
> package org.roy.demo.streaming.bus
> import java.sql.Timestamp
> import org.apache.log4j.{Level, Logger}
> import org.apache.spark.sql.SparkSession
> import org.apache.spark.sql.functions.current_timestamp
> import org.apache.spark.sql.streaming._
> import streaming.StreamingExamples
> /**
>   * create by Roy 2019/09/06
>   * Counting day order number and amount
>   */
> object StructuredStoreOrderState {
>   Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
>   StreamingExamples.setStreamingLogLevels()
>   def main(args: Array[String]): Unit = {
>     val spark = SparkSession
>       .builder.master("local[*]")
>       .appName("StructuredSessionization")
>       .getOrCreate()
>     import spark.implicits._
>     // Create DataFrame representing the stream of input lines from connection to host:port
>     val lines = spark.readStream
>       .format("socket")
>       .option("host", "10.200.102.192")
>       .option("port", 9998)
>       .load().withColumn("current_timestamp", current_timestamp)
>     //100,1,10,20,2019-09-03
>     //1001,1,10,200,2019-09-03
>     //1001,1,10,2000,2019-09-03
>     val events = lines
>       .as[(String, Timestamp)]
>       .map { case (line, timestamp) => {
>         val orderInfo = line.split(",")
>         if (orderInfo != null && orderInfo.size > 4) {
>           val objEvent = dataEvent(orderInfo(0), orderInfo(1).toInt, orderInfo(2), orderInfo(3).toDouble, orderInfo(4), timestamp)
>           objEvent
>         } else {
>           null
>         }
>       }
>       }.filter(obj => obj != null)
>     val orderUpdates = events
>       .groupByKey(event => event.storeId)
>       .mapGroupsWithState[storeOrderInfoState, storeOrderInfoStateUpdate](GroupStateTimeout.ProcessingTimeTimeout) {
>       case (key: String, values: Iterator[dataEvent], state: GroupState[storeOrderInfoState]) =>
>         val seqs = values.toSeq
>         val times = seqs.map(_.timestamp).seq
>         val max_time = new Timestamp(System.currentTimeMillis())
>         if (state.hasTimedOut) {
>           val finalUpdate =
>             storeOrderInfoStateUpdate(key, state.get.orderNum, state.get.orderMoney, max_time, expired = true)
>           state.remove()
>           finalUpdate
>         } else {
>           val updatedSession = if (state.exists) {
>             val stateMap = state.get.orderInfoStoreMap
>             var norderMap2: Map[String, Double] = Map()
>             var num = 0
>             var money = 0.0
>             seqs.foreach(e => {
>               if (stateMap.contains(e.orderId)) {
>                 //新订单-旧订单,再进行+总合
>                 money += e.money - stateMap.get(e.orderId).get
>               } else {
>                 num += 1
>                 money += e.money
>               }
>               norderMap2 += (e.orderId -> e.money)
>             })
>             //取出所有的订单+流进来的订单,需要判断是否有重复订单
>             storeOrderInfoState(key, state.get.orderNum + num, state.get.orderMoney + money, stateMap ++ norderMap2, max_time)
>           } else {
>             var norderMap2: Map[String, Double] = Map()
>             var money = 0.0
>             seqs.foreach(e => {
>               money += e.money
>               norderMap2 += (e.orderId -> e.money)
>             })
>             storeOrderInfoState(key, norderMap2.size, money, norderMap2, max_time)
>           }
>           //更新缓存里面的这条数据信息
>           println("updatedSession" + updatedSession)
>           println(updatedSession.orderMoney)
>           state.update(updatedSession)
>           // Set timeout such that the session will be expired if no data received for 10 seconds
>           state.setTimeoutDuration("3600 seconds")
>           storeOrderInfoStateUpdate(key, state.get.orderNum, state.get.orderMoney, max_time, expired = false)
>         }
>     }
>     val query = orderUpdates
>       .writeStream
>       .outputMode("update")
>       .format("console")
>       .start()
>     query.awaitTermination()
>   }
> }
> /** User-defined data type representing the input events */
> case class dataEvent(orderId: String, otype: Int, storeId: String, money: Double, orderTime: String, timestamp: Timestamp)
> //最一个订单状态
> case class orderEnventInfo(orderId: String, otype: Int, storeId: String, money: Double, orderTime: String, timestamp: Timestamp)
> //门店里面,维护一张所有订单,sotre  orderInfoStoreMap: Map[String, orderEnventInfo]  Seq[orderEnventInfo]
> case class storeOrderInfoState(storeId: String, orderNum: Int, orderMoney: Double, orderInfoStoreMap: Map[String, Double], timestamp: Timestamp)
> //返回计算后的结果
> case class storeOrderInfoStateUpdate(storeId: String, orderNum: Int, orderMoney: Double, timestamp: Timestamp,
>                                      expired: Boolean)
> {code}
> The final output I need is this
>   !微信图片_20190906170459.png!



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org