You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by smq <37...@qq.com> on 2021/03/10 05:37:26 UTC

回复:MapState 无法更新问题

可以贴个完整的代码吗





------------------ 原始邮件 ------------------
发件人: chaos <jinhshi@163.com&gt;
发送时间: 2021年3月10日 12:51
收件人: user-zh <user-zh@flink.apache.org&gt;
主题: 回复:MapState 无法更新问题



你好,我在使用广播流的时候定义了一个MapState,并在逻辑处理中往其中放数据,但是我始终没法成功更新其值,忘解惑。 

定义:
private val carEfenceState: MapState[String, Boolean] = new
MapStateDescriptor[String, Boolean](&quot;carEfenceState&quot;, classOf[String],
classOf[Boolean])

存值:
carEfenceState.put(mapKey, true)

取值:
carEfenceState.get(mapKey)

取到的值始终为 false.

Thanks in advance!



--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复: 回复:MapState 无法更新问题

Posted by 明启 孙 <37...@qq.com>.
A read-only view of the {@link BroadcastState}.
*
* <p>Although read-only, the user code should not modify the value returned by the {@link
* #get(Object)} or the entries of the immutable iterator returned by the {@link
* #immutableEntries()}, as this can lead to inconsistent states. The reason for this is that we do
* not create extra copies of the elements for performance reasons.
*
* @param <K> The key type of the elements in the {@link ReadOnlyBroadcastState}.
* @param <V> The value type of the elements in the {@link ReadOnlyBroadcastState}.
*/
这是源码中对ReadOnlyBroadcastState的描述,希望对你有帮助


smq

发件人: chaos
发送时间: 2021年3月10日 14:29
收件人: user-zh@flink.apache.org
主题: Re: 回复:MapState 无法更新问题

主要代码如下:

class getRule extends KeyedBroadcastProcessFunction[String,
KafkaStreamSource, List[Rule], KafkaStreamSource] {

	private var carEfenceState: MapState[String, Boolean] = _
	
	override def open(parameters: Configuration): Unit = {
		carEfenceState = getRuntimeContext.getMapState(new
MapStateDescriptor[String, Boolean]("carEfenceState", classOf[String],
classOf[Boolean]))
	}
	
	override def processBroadcastElement(in2: List[Rule], context:
KeyedBroadcastProcessFunction[String, KafkaStreamSource, List[Rule],
KafkaStreamSource]#Context, collector: Collector[KafkaStreamSource]): Unit =
{
		context.getBroadcastState(ruleStateDescriptor).put("rules", in2)
	}
	
	override def processElement(kafkaSource: KafkaStreamSource,
readOnlyContext: KeyedBroadcastProcessFunction[String, KafkaStreamSource,
List[Rule], KafkaStreamSource]#ReadOnlyContext, collector:
Collector[KafkaStreamSource]): Unit = {
		
		val ruleIterator =
readOnlyContext.getBroadcastState(ruleStateDescriptor).immutableEntries().iterator()
		while (ruleIterator.hasNext) {
			val ruleMap: Map.Entry[String, List[Rule]] = ruleIterator.next()
			val ruleList: List[Rule] = ruleMap.getValue
			
			
			for (rule <- ruleList) {
			
				val mapKey = kafkaSource.vno + rule.id
				val tempState = carEfenceState.get(mapKey)
				val currentState = if (tempState != null) tempState else false
				// 业务逻辑
				if (!currentState) {
					...
					carEfenceState.put(mapKey, true)
					...
				} else if (currentState) {
					...
					carEfenceState.remove(mapKey)
					...
				}
			}
		}
	}
}



--
Sent from: http://apache-flink.147419.n8.nabble.com/
c


Re: 回复: 回复:MapState 无法更新问题

Posted by chaos <ji...@163.com>.
感谢回复,问题已解决。

解决方式:

参照官网的一个例子将状态的获取放在 processElement 内部。

  private val eFenceMapStateDesc = new MapStateDescriptor[String,
Boolean]("carEfenceState", classOf[String], classOf[Boolean])
  private val DbIdMapStateDesc = new MapStateDescriptor[String,
Long]("eFenceCarDbIdState", classOf[String], classOf[Long])


override def processElement(...){
    val eFenceMapState = getRuntimeContext.getMapState(eFenceMapStateDesc)
    val dbIdMapState = getRuntimeContext.getMapState(DbIdMapStateDesc)

    ....
}




--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复: 回复:MapState 无法更新问题

Posted by 明启 孙 <37...@qq.com>.
你虽然调整了实现方法的顺序,但是这个程序执行顺序还是先执行processElement(),后执行processBroadcastElement()


发件人: chaos
发送时间: 2021年3月10日 14:29
收件人: user-zh@flink.apache.org
主题: Re: 回复:MapState 无法更新问题

主要代码如下:

class getRule extends KeyedBroadcastProcessFunction[String,
KafkaStreamSource, List[Rule], KafkaStreamSource] {

	private var carEfenceState: MapState[String, Boolean] = _
	
	override def open(parameters: Configuration): Unit = {
		carEfenceState = getRuntimeContext.getMapState(new
MapStateDescriptor[String, Boolean]("carEfenceState", classOf[String],
classOf[Boolean]))
	}
	
	override def processBroadcastElement(in2: List[Rule], context:
KeyedBroadcastProcessFunction[String, KafkaStreamSource, List[Rule],
KafkaStreamSource]#Context, collector: Collector[KafkaStreamSource]): Unit =
{
		context.getBroadcastState(ruleStateDescriptor).put("rules", in2)
	}
	
	override def processElement(kafkaSource: KafkaStreamSource,
readOnlyContext: KeyedBroadcastProcessFunction[String, KafkaStreamSource,
List[Rule], KafkaStreamSource]#ReadOnlyContext, collector:
Collector[KafkaStreamSource]): Unit = {
		
		val ruleIterator =
readOnlyContext.getBroadcastState(ruleStateDescriptor).immutableEntries().iterator()
		while (ruleIterator.hasNext) {
			val ruleMap: Map.Entry[String, List[Rule]] = ruleIterator.next()
			val ruleList: List[Rule] = ruleMap.getValue
			
			
			for (rule <- ruleList) {
			
				val mapKey = kafkaSource.vno + rule.id
				val tempState = carEfenceState.get(mapKey)
				val currentState = if (tempState != null) tempState else false
				// 业务逻辑
				if (!currentState) {
					...
					carEfenceState.put(mapKey, true)
					...
				} else if (currentState) {
					...
					carEfenceState.remove(mapKey)
					...
				}
			}
		}
	}
}



--
Sent from: http://apache-flink.147419.n8.nabble.com/
c


Re: 回复:MapState 无法更新问题

Posted by chaos <ji...@163.com>.
你好,

主要代码如下:

class getRule extends KeyedBroadcastProcessFunction[String,
KafkaStreamSource, List[Rule], KafkaStreamSource] {

	private var carEfenceState: MapState[String, Boolean] = _
	
	override def open(parameters: Configuration): Unit = {
		carEfenceState = getRuntimeContext.getMapState(new
MapStateDescriptor[String, Boolean]("carEfenceState", classOf[String],
classOf[Boolean]))
	}
	
	override def processBroadcastElement(in2: List[Rule], context:
KeyedBroadcastProcessFunction[String, KafkaStreamSource, List[Rule],
KafkaStreamSource]#Context, collector: Collector[KafkaStreamSource]): Unit =
{
		context.getBroadcastState(ruleStateDescriptor).put("rules", in2)
	}
	
	override def processElement(kafkaSource: KafkaStreamSource,
readOnlyContext: KeyedBroadcastProcessFunction[String, KafkaStreamSource,
List[Rule], KafkaStreamSource]#ReadOnlyContext, collector:
Collector[KafkaStreamSource]): Unit = {
		
		val ruleIterator =
readOnlyContext.getBroadcastState(ruleStateDescriptor).immutableEntries().iterator()
		while (ruleIterator.hasNext) {
			val ruleMap: Map.Entry[String, List[Rule]] = ruleIterator.next()
			val ruleList: List[Rule] = ruleMap.getValue
			
			
			for (rule <- ruleList) {
			
				val mapKey = kafkaSource.vno + rule.id
				val tempState = carEfenceState.get(mapKey)
				val currentState = if (tempState != null) tempState else false
				// 业务逻辑
				if (!currentState) {
					...
					carEfenceState.put(mapKey, true)
					...
				} else if (currentState) {
					...
					carEfenceState.remove(mapKey)
					...
				}
			}
		}
	}
}



--
Sent from: http://apache-flink.147419.n8.nabble.com/