You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by ri...@sina.cn on 2017/03/24 09:59:52 UTC

回复:Re: flink Broadcast

yes,it is YARN single job,use the commend:
flink-1.1.1/bin/flink run -m yarn-cluster \-yn 2 \-ys 2 \-yjm 2048 \-ytm 2048 \--class statics.ComputeDocSim \--classpath file:///opt/cloudera/parcels/CDH/lib/hbase/hbase-client.jar \--classpath file:///opt/cloudera/parcels/CDH/lib/hbase/hbase-protocol.jar \--classpath file:///opt/cloudera/parcels/CDH/lib/hbase/hbase-common.jar \--classpath file:///opt/cloudera/parcels/CDH/jars/htrace-core-3.1.0-incubating.jar \--classpath file:///opt/cloudera/parcels/CDH/lib/hbase/lib/guava-12.0.1.jar \text-assembly-0.1.0.jar hdfs:///user/hadoop/tf-idf-ex hdfs:///user/hadoop/tf-idf-ex-sims
and code is: val to = //DataSet[(String, Vector)] val to = from.collect() val cosDistince = CosineDistanceMetric.apply() val res = from.map{x=>          val fromId = x._1          val docSims = to.filter(_._1!=fromId).map{y=>            val toId = y._1                      val score = 1-cosDistince.distance(x._2, y._2)            (toId,score)          }.toList.sortWith((x,y)=>x._2>y._2).take(20)         (fromId,docSims)       }res.writeAsText(..)
----- 原始邮件 -----
发件人:Stephan Ewen <se...@apache.org>
收件人:user@flink.apache.org
抄送人:亘谷 <ri...@sina.cn>
主题:Re: flink Broadcast
日期:2017年03月24日 17点40分

The program consists of two executions - one that only collects() back to the client, one that executes the map function.
Are you running this as a "YARN single job" execution? IN that case, there may be an issue that this incorrectly tries to submit to a stopping YARN cluster.


On Fri, Mar 24, 2017 at 10:32 AM, Robert Metzger <rm...@apache.org> wrote:
Hi,
Can you provide more logs to help us understand whats going on?
One note regarding your application: You are calling .collect() and send the collection with the map() call to the cluster again.This is pretty inefficient and can potentially break your application (in particular the RPC system of Flink).
I would recommend to use broadcast variables to send the dataset to the map operator: https://cwiki.apache.org/confluence/display/FLINK/Variables+Closures+vs.+Broadcast+Variables

On Thu, Mar 23, 2017 at 3:11 PM,  <ri...@sina.cn> wrote:
Hi ,alll,

i have a 36000 documents,and the document all transfer a vector , one doc is a vector,and dimension is the same,so have DataSet

------------------------

val data :DataSet[(String,SparseVector)]= ....//36000 record

val toData = data.collect()

val docSims = data.map{x=>

     val fromId=x._1

     val docsims = toData.filter{y=>y._1!=fromId}.map{y=>

          val score =1- cosDisticnce(x._2,y._2)

         (y._1,score)

     }.toList.sortWith{(a,b)=>a._2>b._2}.take(20)

   (fromId,docsims)

}

docSims.writeAsText(file)

.....

when run the job on yarn,it will get error ,the message is following:

       java.lang.InterruptedException  at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2017)

        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2052)

        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)

        at org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$CallbackHandlerThread.run(AMRMClientAsyncImpl.java:274)





someone can tell me ?thank you





Re: Re: flink Broadcast

Posted by Till Rohrmann <tr...@apache.org>.
Hi Rimin,

I've just tested a Flink application consisting of multiple jobs similar to
yours (using collect) with the `yarn-cluster` option and Flink 1.2.0 and it
seemed to work. The yarn cluster was only shut down after the last Flink
job has been executed. Could you maybe test if your problem still exists
with Flink 1.2.0?

Cheers,
Till

On Fri, Mar 24, 2017 at 10:59 AM, <ri...@sina.cn> wrote:

> yes,it is YARN single job,use the commend:
>
> flink-1.1.1/bin/flink run
> -m yarn-cluster \
> -yn 2 \
> -ys 2 \
> -yjm 2048 \
> -ytm 2048 \
> --class statics.ComputeDocSim \
> --classpath file:///opt/cloudera/parcels/CDH/lib/hbase/hbase-client.jar \
> --classpath file:///opt/cloudera/parcels/CDH/lib/hbase/hbase-protocol.jar
> \
> --classpath file:///opt/cloudera/parcels/CDH/lib/hbase/hbase-common.jar \
> --classpath file:///opt/cloudera/parcels/CDH/jars/htrace-core-3.1.0-incubating.jar
> \
> --classpath file:///opt/cloudera/parcels/CDH/lib/hbase/lib/guava-12.0.1.jar
> \
> text-assembly-0.1.0.jar hdfs:///user/hadoop/tf-idf-ex
> hdfs:///user/hadoop/tf-idf-ex-sims
>
> and code is:
>  val to = //DataSet[(String, Vector)]
>  val to = from.collect()
>  val cosDistince = CosineDistanceMetric.apply()
>  val res = from.map{x=>
>           val fromId = x._1
>           val docSims = to.filter(_._1!=fromId).map{y=>
>             val toId = y._1
>             val score = 1-cosDistince.distance(x._2, y._2)
>             (toId,score)
>           }.toList.sortWith((x,y)=>x._2>y._2).take(20)
>          (fromId,docSims)
>        }
> res.writeAsText(..)
>
> ----- 原始邮件 -----
> 发件人:Stephan Ewen <se...@apache.org>
> 收件人:user@flink.apache.org
> 抄送人:亘谷 <ri...@sina.cn>
> 主题:Re: flink Broadcast
> 日期:2017年03月24日 17点40分
>
> The program consists of two executions - one that only collects() back to
> the client, one that executes the map function.
>
> Are you running this as a "YARN single job" execution? IN that case, there
> may be an issue that this incorrectly tries to submit to a stopping YARN
> cluster.
>
>
>
> On Fri, Mar 24, 2017 at 10:32 AM, Robert Metzger <rm...@apache.org>
> wrote:
>
> Hi,
>
> Can you provide more logs to help us understand whats going on?
>
> One note regarding your application: You are calling .collect() and send
> the collection with the map() call to the cluster again.
> This is pretty inefficient and can potentially break your application (in
> particular the RPC system of Flink).
>
> I would recommend to use broadcast variables to send the dataset to the
> map operator: https://cwiki.apache.org/confluence/display/FLINK/
> Variables+Closures+vs.+Broadcast+Variables
>
>
> On Thu, Mar 23, 2017 at 3:11 PM, <ri...@sina.cn> wrote:
>
> Hi ,alll,
> i have a 36000 documents,and the document all transfer a vector , one doc
> is a vector,and dimension is the same,so have DataSet
> ------------------------
> val data :DataSet[(String,SparseVector)]= ....//36000 record
> val toData = data.collect()
> val docSims = data.map{x=>
>      val fromId=x._1
>      val docsims = toData.filter{y=>y._1!=fromId}.map{y=>
>           val score =1- cosDisticnce(x._2,y._2)
>          (y._1,score)
>      }.toList.sortWith{(a,b)=>a._2>b._2}.take(20)
>    (fromId,docsims)
> }
> docSims.writeAsText(file)
> .....
> when run the job on yarn,it will get error ,the message is following:
>        java.lang.InterruptedException  at java.util.concurrent.locks.Abs
> tractQueuedSynchronizer$ConditionObject.reportInterruptAfter
> Wait(AbstractQueuedSynchronizer.java:2017)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer$Condit
> ionObject.await(AbstractQueuedSynchronizer.java:2052)
>         at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlocking
> Queue.java:442)
>         at org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsync
> Impl$CallbackHandlerThread.run(AMRMClientAsyncImpl.java:274)
>
>
> someone can tell me ?thank you
>
>
>
>