You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Timo Walther (JIRA)" <ji...@apache.org> on 2018/11/19 14:03:00 UTC

[jira] [Commented] (FLINK-10915) clojure context.collectWithTimestamp Will be blocked.

    [ https://issues.apache.org/jira/browse/FLINK-10915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16691727#comment-16691727 ] 

Timo Walther commented on FLINK-10915:
--------------------------------------

Thanks for reporting this issue [~xingzhe]. Unfortunately, Clojure is not officially supported by the Flink community. If there are are problems like the one you reported, it should probably be handled by some wrapping Flink-Clojure framework/library. Therefore, I will close this issue for now.

> clojure   context.collectWithTimestamp  Will be blocked.
> --------------------------------------------------------
>
>                 Key: FLINK-10915
>                 URL: https://issues.apache.org/jira/browse/FLINK-10915
>             Project: Flink
>          Issue Type: Improvement
>          Components: Client
>    Affects Versions: 1.6.2
>            Reporter: fengge
>            Assignee: Timo Walther
>            Priority: Minor
>
> {code:java}
> (deftype Cflatmapfunction [] FlatMapFunction
>   (flatMap [this value collector]
>     (log/info "value:" (type value) value )
>     (let [tomap (into {} value)
>           {:keys [shopid  shopname]} (ym/readstring (get tomap "body"))]
>       ;(.collect collector (Tuple5. msgid "orgidid" "toporgid" "(ym/readstring body)" 1))
>       (.collect collector (Tuple3. shopid shopname (int 1)))
>       )
>     ))
> ;;;The problem is here... Clojure realizes that FlatMapFunction will block in clusters.  but local jvm run is ok ..
> {code}
> {code:java}
> (defn -main [& args]
>   (let [flink-env (StreamExecutionEnvironment/getExecutionEnvironment)
>         _ (.enableCheckpointing flink-env 13000)
>         sources (.addSource flink-env
>                             (RocketMQSource. (SimpleKeyValueDeserializationSchema. "msgid" "body") 
>                                              (gen-consumer-properties)))
>         _ (.name sources "ririri")
>         _ (.setParallelism sources 1)
>         ednds (.returns (.flatMap sources (Cflatmapfunction.)) CloudTuple3)
>         _ (.name ednds "ccc")
>         _ (.setParallelism ednds 1)
>         _   (.print ednds)
>         ;counts (.sum (.timeWindow(.keyBy ednds (int-array [0])) (Time/seconds 10))  2)
>         ;secondcounts (.sum (.timeWindow(.keyBy counts (int-array [0])) (Time/minutes 2))  2)
>         ]
>     (prn "开始有状态的流式计算1" flink-env)
>     ;(.setParallelism ds 1)
>     ;(.setParallelism ednds 1)
>     ;(.print counts)
>     ;(.print secondcounts)
>     (.execute flink-env"rocketmq-flink-feng2")
>     )
>   )
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)