You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by Xin Wang <da...@gmail.com> on 2018/03/21 12:34:03 UTC

Re: 请教rocketmq-spark问题

你好,
很抱歉错过了这封邮件。由于这是个已知问题为了减少对用户的影响,故抄送到dev mailing list。

这是RocketMQ-Spark去年已经修复的一个BUG(详情:
https://github.com/apache/rocketmq-externals/pull/36)
可以使用最新版本解决此问题。

BTW,RocketMQ-Spark近期将会进行一次优化改进,为RocketMQ用户提供一个更稳定、可靠的大数据集成组件。

谢谢。

在 2017年10月14日 下午7:53,Yingjun Li 李颖俊 <yi...@midea.com>写道:

> 你好:
>     最近我在使用apache/rocketmq-externals 这个项目里面有关spark-streaming
> 的插件,然后通过在github上面了解到consumer push mode是你实现合并上去的。方便打扰一下向你请教点问题吗?
>     我运行的环境如附件。
>
>
>     JavaInputDStream<Message> stream = RocketMqUtils.
> createJavaReliableMQPushStream(
>                 jscc,pushConsumerProperties,StorageLevel.MEMORY_AND_DISK_
> SER());
>
>     我采用的是reliable的push mode,数据可以从rocketMQ出来存进去队列BlockingQueue<MessageSet>
> queue;
>     但是到了这一步
>
>     try {
>         // According to the official docs
>         // 'To implement a reliable receiver, you have to use
> store(multiple-records) to store data'
>         ReliableRocketMQReceiver.this.store(messageSet);
>         ack(messageSet.getId());
>      } catch (Exception e) {
>          fail(messageSet.getId());
>      }
>      MessageSender的进程就一直卡住了,过一段时间就出现OOM错误,如下:
>
>      java.lang.OutOfMemoryError: Java heap space
>         at scala.reflect.ManifestFactory$$anon$2.newArray(Manifest.
> scala:177)
>         at scala.reflect.ManifestFactory$$anon$2.newArray(Manifest.
> scala:176)
>         at org.apache.spark.util.collection.PrimitiveVector.
> copyArrayWithLength(PrimitiveVector.scala:87)
>         at org.apache.spark.util.collection.PrimitiveVector.
> resize(PrimitiveVector.scala:74)
>         at org.apache.spark.util.collection.SizeTrackingVector.
> resize(SizeTrackingVector.scala:35)
>         at org.apache.spark.util.collection.PrimitiveVector.$
> plus$eq(PrimitiveVector.scala:41)
>         at org.apache.spark.util.collection.SizeTrackingVector.
> $plus$eq(SizeTrackingVector.scala:30)
>         at org.apache.spark.storage.memory.MemoryStore.
> putIteratorAsValues(MemoryStore.scala:216)
>         at org.apache.spark.storage.BlockManager$$anonfun$
> doPutIterator$1.apply(BlockManager.scala:1038)
>         at org.apache.spark.storage.BlockManager$$anonfun$
> doPutIterator$1.apply(BlockManager.scala:1029)
>         at org.apache.spark.storage.BlockManager.doPut(
> BlockManager.scala:969)
>         at org.apache.spark.storage.BlockManager.doPutIterator(
> BlockManager.scala:1029)
>         at org.apache.spark.storage.BlockManager.putIterator(
> BlockManager.scala:792)
>         at org.apache.spark.streaming.receiver.
> BlockManagerBasedBlockHandler.storeBlock(ReceivedBlockHandler.scala:84)
>         at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.
> pushAndReportBlock(ReceiverSupervisorImpl.scala:158)
>         at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.
> pushIterator(ReceiverSupervisorImpl.scala:138)
>         at org.apache.spark.streaming.receiver.Receiver.store(
> Receiver.scala:152)
>         at org.apache.rocketmq.spark.streaming.ReliableRocketMQReceiver$
> MessageSender.run(ReliableRocketMQReceiver.java:119)
>
>    通过dump发现List<Object>实例过多的问题,但是我本身才往rocketMQ插入500条记录。
>     num     #instances         #bytes  class name
> ----------------------------------------------
>    1:          5005      537537816  [Ljava.lang.Object;
>    2:          1664      135648072  [B
>    3:         30846        2588360  [C
>
>     后面我就暂时无解了,只是猜测会不会跟MessageSet 里面的private List<MessageExt> data有关。
>     看到邮件如果有时间方便的话非常希望能得到你的帮助,谢谢!
>
>                                                                       李颖俊
>
>
> 2017.10.14
>
>
>




-- 
Thanks,
Xin

Fwd: 请教rocketmq-spark问题

Posted by Xin Wang <da...@gmail.com>.
---------- 已转发邮件 ----------
发件人: Xin Wang <da...@gmail.com>
日期: 2018年3月21日 下午8:34
主题: Re: 请教rocketmq-spark问题
收件人: Yingjun Li 李颖俊 <yi...@midea.com>, dev@rocketmq.apache.org


你好,
很抱歉错过了这封邮件。由于这是个已知问题为了减少对用户的影响,故抄送到dev mailing list。

这是RocketMQ-Spark去年已经修复的一个BUG(详情:https://github.com/apache/
rocketmq-externals/pull/36)
可以使用最新版本解决此问题。

BTW,RocketMQ-Spark近期将会进行一次优化改进,为RocketMQ用户提供一个更稳定、可靠的大数据集成组件。

谢谢。

在 2017年10月14日 下午7:53,Yingjun Li 李颖俊 <yi...@midea.com>写道:

> 你好:
>     最近我在使用apache/rocketmq-externals 这个项目里面有关spark-streaming
> 的插件,然后通过在github上面了解到consumer push mode是你实现合并上去的。方便打扰一下向你请教点问题吗?
>     我运行的环境如附件。
>
>
>     JavaInputDStream<Message> stream = RocketMqUtils.createJavaReliab
> leMQPushStream(
>                 jscc,pushConsumerProperties,St
> orageLevel.MEMORY_AND_DISK_SER());
>
>     我采用的是reliable的push mode,数据可以从rocketMQ出来存进去队列BlockingQueue<MessageSet>
> queue;
>     但是到了这一步
>
>     try {
>         // According to the official docs
>         // 'To implement a reliable receiver, you have to use
> store(multiple-records) to store data'
>         ReliableRocketMQReceiver.this.store(messageSet);
>         ack(messageSet.getId());
>      } catch (Exception e) {
>          fail(messageSet.getId());
>      }
>      MessageSender的进程就一直卡住了,过一段时间就出现OOM错误,如下:
>
>      java.lang.OutOfMemoryError: Java heap space
>         at scala.reflect.ManifestFactory$$anon$2.newArray(Manifest.scal
> a:177)
>         at scala.reflect.ManifestFactory$$anon$2.newArray(Manifest.scal
> a:176)
>         at org.apache.spark.util.collection.PrimitiveVector.copyArrayWi
> thLength(PrimitiveVector.scala:87)
>         at org.apache.spark.util.collection.PrimitiveVector.resize(
> PrimitiveVector.scala:74)
>         at org.apache.spark.util.collection.SizeTrackingVector.resize(
> SizeTrackingVector.scala:35)
>         at org.apache.spark.util.collection.PrimitiveVector.$plus$eq(
> PrimitiveVector.scala:41)
>         at org.apache.spark.util.collection.SizeTrackingVector.$plus$
> eq(SizeTrackingVector.scala:30)
>         at org.apache.spark.storage.memory.MemoryStore.putIteratorAsVal
> ues(MemoryStore.scala:216)
>         at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator
> $1.apply(BlockManager.scala:1038)
>         at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator
> $1.apply(BlockManager.scala:1029)
>         at org.apache.spark.storage.BlockManager.doPut(BlockManager.
> scala:969)
>         at org.apache.spark.storage.BlockManager.doPutIterator(BlockMan
> ager.scala:1029)
>         at org.apache.spark.storage.BlockManager.putIterator(BlockManag
> er.scala:792)
>         at org.apache.spark.streaming.receiver.BlockManagerBasedBlockHa
> ndler.storeBlock(ReceivedBlockHandler.scala:84)
>         at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.p
> ushAndReportBlock(ReceiverSupervisorImpl.scala:158)
>         at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.p
> ushIterator(ReceiverSupervisorImpl.scala:138)
>         at org.apache.spark.streaming.receiver.Receiver.store(Receiver.
> scala:152)
>         at org.apache.rocketmq.spark.streaming.ReliableRocketMQReceiver
> $MessageSender.run(ReliableRocketMQReceiver.java:119)
>
>    通过dump发现List<Object>实例过多的问题,但是我本身才往rocketMQ插入500条记录。
>     num     #instances         #bytes  class name
> ----------------------------------------------
>    1:          5005      537537816  [Ljava.lang.Object;
>    2:          1664      135648072  [B
>    3:         30846        2588360  [C
>
>     后面我就暂时无解了,只是猜测会不会跟MessageSet 里面的private List<MessageExt> data有关。
>     看到邮件如果有时间方便的话非常希望能得到你的帮助,谢谢!
>
>                                                                       李颖俊
>
>
> 2017.10.14
>
>
>




-- 
Thanks,
Xin



-- 
Thanks,
Xin