You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Timo Walther (JIRA)" <ji...@apache.org> on 2018/11/19 14:03:00 UTC
[jira] [Commented] (FLINK-10915) clojure
context.collectWithTimestamp Will be blocked.
[ https://issues.apache.org/jira/browse/FLINK-10915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16691727#comment-16691727 ]
Timo Walther commented on FLINK-10915:
--------------------------------------
Thanks for reporting this issue [~xingzhe]. Unfortunately, Clojure is not officially supported by the Flink community. If there are are problems like the one you reported, it should probably be handled by some wrapping Flink-Clojure framework/library. Therefore, I will close this issue for now.
> clojure context.collectWithTimestamp Will be blocked.
> --------------------------------------------------------
>
> Key: FLINK-10915
> URL: https://issues.apache.org/jira/browse/FLINK-10915
> Project: Flink
> Issue Type: Improvement
> Components: Client
> Affects Versions: 1.6.2
> Reporter: fengge
> Assignee: Timo Walther
> Priority: Minor
>
> {code:java}
> (deftype Cflatmapfunction [] FlatMapFunction
> (flatMap [this value collector]
> (log/info "value:" (type value) value )
> (let [tomap (into {} value)
> {:keys [shopid shopname]} (ym/readstring (get tomap "body"))]
> ;(.collect collector (Tuple5. msgid "orgidid" "toporgid" "(ym/readstring body)" 1))
> (.collect collector (Tuple3. shopid shopname (int 1)))
> )
> ))
> ;;;The problem is here... Clojure realizes that FlatMapFunction will block in clusters. but local jvm run is ok ..
> {code}
> {code:java}
> (defn -main [& args]
> (let [flink-env (StreamExecutionEnvironment/getExecutionEnvironment)
> _ (.enableCheckpointing flink-env 13000)
> sources (.addSource flink-env
> (RocketMQSource. (SimpleKeyValueDeserializationSchema. "msgid" "body")
> (gen-consumer-properties)))
> _ (.name sources "ririri")
> _ (.setParallelism sources 1)
> ednds (.returns (.flatMap sources (Cflatmapfunction.)) CloudTuple3)
> _ (.name ednds "ccc")
> _ (.setParallelism ednds 1)
> _ (.print ednds)
> ;counts (.sum (.timeWindow(.keyBy ednds (int-array [0])) (Time/seconds 10)) 2)
> ;secondcounts (.sum (.timeWindow(.keyBy counts (int-array [0])) (Time/minutes 2)) 2)
> ]
> (prn "开始有状态的流式计算1" flink-env)
> ;(.setParallelism ds 1)
> ;(.setParallelism ednds 1)
> ;(.print counts)
> ;(.print secondcounts)
> (.execute flink-env"rocketmq-flink-feng2")
> )
> )
> {code}
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)