You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Jerry Wong <je...@gmail.com> on 2015/10/29 19:45:47 UTC

Exception in thread "main" java.lang.IllegalArgumentException: Positive number of slices required

I used the spark 1.3.1 to populate the event logs to Cassandra. But there
is an exception that I could not find out any clauses. Can anybody give me
any helps?

Exception in thread "main" java.lang.IllegalArgumentException: Positive
number of slices required
 at
org.apache.spark.rdd.ParallelCollectionRDD$.slice(ParallelCollectionRDD.scala:119)
 at
org.apache.spark.rdd.ParallelCollectionRDD.getPartitions(ParallelCollectionRDD.scala:97)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:220)
 at org.apache.spark.SparkContext.runJob(SparkContext.scala:1367)
 at org.apache.spark.rdd.RDD.collect(RDD.scala:797)
 at
com.siriusxm.sequencer.ingestor.EventLogClusterIngestor$$anonfun$processEventLogMapStreamDiff2$1$$anonfun$apply$7$$anonfun$apply$1$$anonfun$apply$mcV$sp$2$$anonfun$apply$2$$anonfun$apply$mcV$sp$4$$anonfun$apply$3.apply$mcV$sp(EventLogClusterIngestor.scala:155)
 at scala.util.control.Breaks.breakable(Breaks.scala:37)
 at
com.siriusxm.sequencer.ingestor.EventLogClusterIngestor$$anonfun$processEventLogMapStreamDiff2$1$$anonfun$apply$7$$anonfun$apply$1$$anonfun$apply$mcV$sp$2$$anonfun$apply$2$$anonfun$apply$mcV$sp$4.apply(EventLogClusterIngestor.scala:145)
 at
com.siriusxm.sequencer.ingestor.EventLogClusterIngestor$$anonfun$processEventLogMapStreamDiff2$1$$anonfun$apply$7$$anonfun$apply$1$$anonfun$apply$mcV$sp$2$$anonfun$apply$2$$anonfun$apply$mcV$sp$4.apply(EventLogClusterIngestor.scala:144)
 at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
 at
com.siriusxm.sequencer.ingestor.EventLogClusterIngestor$$anonfun$processEventLogMapStreamDiff2$1$$anonfun$apply$7$$anonfun$apply$1$$anonfun$apply$mcV$sp$2$$anonfun$apply$2.apply$mcV$sp(EventLogClusterIngestor.scala:144)
 at scala.util.control.Breaks.breakable(Breaks.scala:37)
 at
com.siriusxm.sequencer.ingestor.EventLogClusterIngestor$$anonfun$processEventLogMapStreamDiff2$1$$anonfun$apply$7$$anonfun$apply$1$$anonfun$apply$mcV$sp$2.apply(EventLogClusterIngestor.scala:139)
 at
com.siriusxm.sequencer.ingestor.EventLogClusterIngestor$$anonfun$processEventLogMapStreamDiff2$1$$anonfun$apply$7$$anonfun$apply$1$$anonfun$apply$mcV$sp$2.apply(EventLogClusterIngestor.scala:132)
 at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
 at
com.siriusxm.sequencer.ingestor.EventLogClusterIngestor$$anonfun$processEventLogMapStreamDiff2$1$$anonfun$apply$7$$anonfun$apply$1.apply$mcV$sp(EventLogClusterIngestor.scala:132)
 at scala.util.control.Breaks.breakable(Breaks.scala:37)
 at
com.siriusxm.sequencer.ingestor.EventLogClusterIngestor$$anonfun$processEventLogMapStreamDiff2$1$$anonfun$apply$7.apply(EventLogClusterIngestor.scala:125)
 at
com.siriusxm.sequencer.ingestor.EventLogClusterIngestor$$anonfun$processEventLogMapStreamDiff2$1$$anonfun$apply$7.apply(EventLogClusterIngestor.scala:115)
 at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
 at
com.siriusxm.sequencer.ingestor.EventLogClusterIngestor$$anonfun$processEventLogMapStreamDiff2$1.apply(EventLogClusterIngestor.scala:115)
 at
com.siriusxm.sequencer.ingestor.EventLogClusterIngestor$$anonfun$processEventLogMapStreamDiff2$1.apply(EventLogClusterIngestor.scala:107)
 at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
 at
com.siriusxm.sequencer.ingestor.EventLogClusterIngestor$.processEventLogMapStreamDiff2(EventLogClusterIngestor.scala:107)
 at
com.siriusxm.sequencer.ingestor.EventLogClusterIngestor$.main(EventLogClusterIngestor.scala:573)
 at
com.siriusxm.sequencer.ingestor.EventLogClusterIngestor.main(EventLogClusterIngestor.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
 at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

This does happen in a RDD foreach. I pasted the broken lines as follows:

132 difFileRDD.filter(a=>a.size>0).collect().foreach(file => {
. //...
135    val lines = sc.textFile("file:///" + file)
136    val elogs = lines.flatMap(_.split("\n"))
137    val numOfel = elogs.count()
138     //...
139    breakable {
140        if(numOfel <= 0) {
141 //......
142          break
143        }else{
144 elogs.filter(a=>a.size>0).foreach(log => {
145               breakable {
146      if (log == null || log.length == 0) {
147                  break
148                }else {
.       //Here is the logical processes
.     }
.         }
.       })
.   }
. }
.})

Any suggestions? Thanks!
----
Wong