You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by Xin Wang <da...@gmail.com> on 2018/03/21 12:34:03 UTC
Re: 请教rocketmq-spark问题
你好,
很抱歉错过了这封邮件。由于这是个已知问题为了减少对用户的影响,故抄送到dev mailing list。
这是RocketMQ-Spark去年已经修复的一个BUG(详情:
https://github.com/apache/rocketmq-externals/pull/36)
可以使用最新版本解决此问题。
BTW,RocketMQ-Spark近期将会进行一次优化改进,为RocketMQ用户提供一个更稳定、可靠的大数据集成组件。
谢谢。
在 2017年10月14日 下午7:53,Yingjun Li 李颖俊 <yi...@midea.com>写道:
> 你好:
> 最近我在使用apache/rocketmq-externals 这个项目里面有关spark-streaming
> 的插件,然后通过在github上面了解到consumer push mode是你实现合并上去的。方便打扰一下向你请教点问题吗?
> 我运行的环境如附件。
>
>
> JavaInputDStream<Message> stream = RocketMqUtils.
> createJavaReliableMQPushStream(
> jscc,pushConsumerProperties,StorageLevel.MEMORY_AND_DISK_
> SER());
>
> 我采用的是reliable的push mode,数据可以从rocketMQ出来存进去队列BlockingQueue<MessageSet>
> queue;
> 但是到了这一步
>
> try {
> // According to the official docs
> // 'To implement a reliable receiver, you have to use
> store(multiple-records) to store data'
> ReliableRocketMQReceiver.this.store(messageSet);
> ack(messageSet.getId());
> } catch (Exception e) {
> fail(messageSet.getId());
> }
> MessageSender的进程就一直卡住了,过一段时间就出现OOM错误,如下:
>
> java.lang.OutOfMemoryError: Java heap space
> at scala.reflect.ManifestFactory$$anon$2.newArray(Manifest.
> scala:177)
> at scala.reflect.ManifestFactory$$anon$2.newArray(Manifest.
> scala:176)
> at org.apache.spark.util.collection.PrimitiveVector.
> copyArrayWithLength(PrimitiveVector.scala:87)
> at org.apache.spark.util.collection.PrimitiveVector.
> resize(PrimitiveVector.scala:74)
> at org.apache.spark.util.collection.SizeTrackingVector.
> resize(SizeTrackingVector.scala:35)
> at org.apache.spark.util.collection.PrimitiveVector.$
> plus$eq(PrimitiveVector.scala:41)
> at org.apache.spark.util.collection.SizeTrackingVector.
> $plus$eq(SizeTrackingVector.scala:30)
> at org.apache.spark.storage.memory.MemoryStore.
> putIteratorAsValues(MemoryStore.scala:216)
> at org.apache.spark.storage.BlockManager$$anonfun$
> doPutIterator$1.apply(BlockManager.scala:1038)
> at org.apache.spark.storage.BlockManager$$anonfun$
> doPutIterator$1.apply(BlockManager.scala:1029)
> at org.apache.spark.storage.BlockManager.doPut(
> BlockManager.scala:969)
> at org.apache.spark.storage.BlockManager.doPutIterator(
> BlockManager.scala:1029)
> at org.apache.spark.storage.BlockManager.putIterator(
> BlockManager.scala:792)
> at org.apache.spark.streaming.receiver.
> BlockManagerBasedBlockHandler.storeBlock(ReceivedBlockHandler.scala:84)
> at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.
> pushAndReportBlock(ReceiverSupervisorImpl.scala:158)
> at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.
> pushIterator(ReceiverSupervisorImpl.scala:138)
> at org.apache.spark.streaming.receiver.Receiver.store(
> Receiver.scala:152)
> at org.apache.rocketmq.spark.streaming.ReliableRocketMQReceiver$
> MessageSender.run(ReliableRocketMQReceiver.java:119)
>
> 通过dump发现List<Object>实例过多的问题,但是我本身才往rocketMQ插入500条记录。
> num #instances #bytes class name
> ----------------------------------------------
> 1: 5005 537537816 [Ljava.lang.Object;
> 2: 1664 135648072 [B
> 3: 30846 2588360 [C
>
> 后面我就暂时无解了,只是猜测会不会跟MessageSet 里面的private List<MessageExt> data有关。
> 看到邮件如果有时间方便的话非常希望能得到你的帮助,谢谢!
>
> 李颖俊
>
>
> 2017.10.14
>
>
>
--
Thanks,
Xin
Fwd: 请教rocketmq-spark问题
Posted by Xin Wang <da...@gmail.com>.
---------- 已转发邮件 ----------
发件人: Xin Wang <da...@gmail.com>
日期: 2018年3月21日 下午8:34
主题: Re: 请教rocketmq-spark问题
收件人: Yingjun Li 李颖俊 <yi...@midea.com>, dev@rocketmq.apache.org
你好,
很抱歉错过了这封邮件。由于这是个已知问题为了减少对用户的影响,故抄送到dev mailing list。
这是RocketMQ-Spark去年已经修复的一个BUG(详情:https://github.com/apache/
rocketmq-externals/pull/36)
可以使用最新版本解决此问题。
BTW,RocketMQ-Spark近期将会进行一次优化改进,为RocketMQ用户提供一个更稳定、可靠的大数据集成组件。
谢谢。
在 2017年10月14日 下午7:53,Yingjun Li 李颖俊 <yi...@midea.com>写道:
> 你好:
> 最近我在使用apache/rocketmq-externals 这个项目里面有关spark-streaming
> 的插件,然后通过在github上面了解到consumer push mode是你实现合并上去的。方便打扰一下向你请教点问题吗?
> 我运行的环境如附件。
>
>
> JavaInputDStream<Message> stream = RocketMqUtils.createJavaReliab
> leMQPushStream(
> jscc,pushConsumerProperties,St
> orageLevel.MEMORY_AND_DISK_SER());
>
> 我采用的是reliable的push mode,数据可以从rocketMQ出来存进去队列BlockingQueue<MessageSet>
> queue;
> 但是到了这一步
>
> try {
> // According to the official docs
> // 'To implement a reliable receiver, you have to use
> store(multiple-records) to store data'
> ReliableRocketMQReceiver.this.store(messageSet);
> ack(messageSet.getId());
> } catch (Exception e) {
> fail(messageSet.getId());
> }
> MessageSender的进程就一直卡住了,过一段时间就出现OOM错误,如下:
>
> java.lang.OutOfMemoryError: Java heap space
> at scala.reflect.ManifestFactory$$anon$2.newArray(Manifest.scal
> a:177)
> at scala.reflect.ManifestFactory$$anon$2.newArray(Manifest.scal
> a:176)
> at org.apache.spark.util.collection.PrimitiveVector.copyArrayWi
> thLength(PrimitiveVector.scala:87)
> at org.apache.spark.util.collection.PrimitiveVector.resize(
> PrimitiveVector.scala:74)
> at org.apache.spark.util.collection.SizeTrackingVector.resize(
> SizeTrackingVector.scala:35)
> at org.apache.spark.util.collection.PrimitiveVector.$plus$eq(
> PrimitiveVector.scala:41)
> at org.apache.spark.util.collection.SizeTrackingVector.$plus$
> eq(SizeTrackingVector.scala:30)
> at org.apache.spark.storage.memory.MemoryStore.putIteratorAsVal
> ues(MemoryStore.scala:216)
> at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator
> $1.apply(BlockManager.scala:1038)
> at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator
> $1.apply(BlockManager.scala:1029)
> at org.apache.spark.storage.BlockManager.doPut(BlockManager.
> scala:969)
> at org.apache.spark.storage.BlockManager.doPutIterator(BlockMan
> ager.scala:1029)
> at org.apache.spark.storage.BlockManager.putIterator(BlockManag
> er.scala:792)
> at org.apache.spark.streaming.receiver.BlockManagerBasedBlockHa
> ndler.storeBlock(ReceivedBlockHandler.scala:84)
> at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.p
> ushAndReportBlock(ReceiverSupervisorImpl.scala:158)
> at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.p
> ushIterator(ReceiverSupervisorImpl.scala:138)
> at org.apache.spark.streaming.receiver.Receiver.store(Receiver.
> scala:152)
> at org.apache.rocketmq.spark.streaming.ReliableRocketMQReceiver
> $MessageSender.run(ReliableRocketMQReceiver.java:119)
>
> 通过dump发现List<Object>实例过多的问题,但是我本身才往rocketMQ插入500条记录。
> num #instances #bytes class name
> ----------------------------------------------
> 1: 5005 537537816 [Ljava.lang.Object;
> 2: 1664 135648072 [B
> 3: 30846 2588360 [C
>
> 后面我就暂时无解了,只是猜测会不会跟MessageSet 里面的private List<MessageExt> data有关。
> 看到邮件如果有时间方便的话非常希望能得到你的帮助,谢谢!
>
> 李颖俊
>
>
> 2017.10.14
>
>
>
--
Thanks,
Xin
--
Thanks,
Xin