You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Mekal Zheng <me...@gmail.com> on 2016/07/15 07:17:17 UTC
scala.MatchError on stand-alone cluster mode
Hi,
I have a Spark Streaming job written in Scala and is running well on local
and client mode, but when I submit it on cluster mode, the driver reported
an error shown as below.
Is there anyone know what is wrong here?
pls help me!
the Job CODE is after
16/07/14 17:28:21 DEBUG ByteBufUtil:
-Dio.netty.threadLocalDirectBufferSize: 65536
16/07/14 17:28:21 DEBUG NetUtil: Loopback interface: lo (lo,
0:0:0:0:0:0:0:1%lo)
16/07/14 17:28:21 DEBUG NetUtil: /proc/sys/net/core/somaxconn: 32768
16/07/14 17:28:21 DEBUG TransportServer: Shuffle server started on port
:43492
16/07/14 17:28:21 INFO Utils: Successfully started service 'Driver' on port
43492.
16/07/14 17:28:21 INFO WorkerWatcher: Connecting to worker spark://
Worker@172.20.130.98:23933
16/07/14 17:28:21 DEBUG TransportClientFactory: Creating new connection to /
172.20.130.98:23933
Exception in thread "main" java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at
org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:58)
at
org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
Caused by: scala.MatchError: [Ljava.lang.String;@68d279ec (of class
[Ljava.lang.String;)
at com.jd.deeplog.LogAggregator$.main(LogAggregator.scala:29)
at com.jd.deeplog.LogAggregator.main(LogAggregator.scala)
... 6 more
==================
Job CODE:
object LogAggregator {
val batchDuration = Seconds(5)
def main(args:Array[String]) {
val usage =
"""Usage: LogAggregator <zkQuorum> <group> <topics> <numThreads>
<logFormat> <logSeparator> <batchDuration> <destType> <destPath>
| logFormat: fieldName:fieldRole[,fieldName:fieldRole] each
field must have both name and role
| logFormat.role: can be key|avg|enum|sum|ignore
""".stripMargin
if (args.length < 9) {
System.err.println(usage)
System.exit(1)
}
val Array(zkQuorum, group, topics, numThreads, logFormat,
logSeparator, batchDuration, destType, destPath) = args
println("Start streaming calculation...")
val conf = new SparkConf().setAppName("LBHaproxy-LogAggregator")
val ssc = new StreamingContext(conf, Seconds(batchDuration.toInt))
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group,
topicMap).map(_._2)
val logFields = logFormat.split(",").map(field => {
val fld = field.split(":")
if (fld.size != 2) {
System.err.println("Wrong parameters for logFormat!\n")
System.err.println(usage)
System.exit(1)
}
// TODO: ensure the field has both 'name' and 'role'
new LogField(fld(0), fld(1))
})
val keyFields = logFields.filter(logFieldName => {
logFieldName.role == "key"
})
val keys = keyFields.map(key => {
key.name
})
val logsByKey = lines.map(line => {
val log = new Log(logFields, line, logSeparator)
log.toMap
}).filter(log => log.nonEmpty).map(log => {
val keys = keyFields.map(logField => {
log(logField.name).value
})
val key = keys.reduce((key1, key2) => {
key1.asInstanceOf[String] + key2.asInstanceOf[String]
})
val fullLog = log + ("count" -> new LogSegment("sum", 1))
(key, fullLog)
})
val aggResults = logsByKey.reduceByKey((log_a, log_b) => {
log_a.map(logField => {
val logFieldName = logField._1
val logSegment_a = logField._2
val logSegment_b = log_b(logFieldName)
val segValue = logSegment_a.role match {
case "avg" => {
logSegment_a.value.toString.toInt +
logSegment_b.value.toString.toInt
}
case "sum" => {
logSegment_a.value.toString.toInt +
logSegment_b.value.toString.toInt
}
case "enum" => {
val list_a = logSegment_a.value.asInstanceOf[List[(String, Int)]]
val list_b = logSegment_b.value.asInstanceOf[List[(String, Int)]]
list_a ++ list_b
}
case _ => logSegment_a.value
}
(logFieldName, new LogSegment(logSegment_a.role, segValue))
})
}).map(logRecord => {
val log = logRecord._2
val count = log("count").value.toString.toInt
val logContent = log.map(logField => {
val logFieldName = logField._1
val logSegment = logField._2
val fieldValue = logSegment.role match {
case "avg" => {
logSegment.value.toString.toInt / count
}
case "enum" => {
val enumList = logSegment.value.asInstanceOf[List[(String, Int)]]
val enumJson = enumList.groupBy(_._1).map(el =>
el._2.reduce((e1, e2) => (e1._1, e1._2.toString.toInt +
e2._2.toString.toInt)))
JSONObject(enumJson)
}
case _ => logSegment.value
}
(logFieldName, fieldValue)
})
logContent + ("count" -> count)
})
if (destType == "hbase") {
val hbaseQuorum = "localhost"
val hbaseClientPort = "2181"
val hbaseStore = new HBaseStore(hbaseQuorum, hbaseClientPort,
keys.toList, "tb_", true)
val jobConf = hbaseStore.jobConf()
aggResults.foreachRDD((rdd, time) => {
rdd.map(record => {
val logPut = hbaseStore.convert(record, time)
(new ImmutableBytesWritable, logPut)
}).saveAsHadoopDataset(jobConf)
})
} else if (destType == "file") {
aggResults.foreachRDD((rdd, time) => {
rdd.foreach(record => {
val res = record + ("timestamp" -> time.milliseconds)
io.File(destPath).appendAll(res.toString() + "\n")
})
})
}
ssc.start()
ssc.awaitTermination()
}
--
Mekal Zheng
Sent with Airmail
Re: scala.MatchError on stand-alone cluster mode
Posted by Mekal Zheng <me...@gmail.com>.
Hi, Rishabh Bhardwaj, Saisai Shao,
Thx for your help. I hava found that the key reason is I forgot to upload
the jar package to all of the node in cluster, so after the master
distributed the job and selected one node as the driver, the driver can
not find the jar package and throw an exception.
--
Mekal Zheng
Sent with Airmail
发件人: Rishabh Bhardwaj <rb...@gmail.com> <rb...@gmail.com>
回复: Rishabh Bhardwaj <rb...@gmail.com> <rb...@gmail.com>
日期: July 15, 2016 at 17:28:43
至: Saisai Shao <sa...@gmail.com> <sa...@gmail.com>
抄送: Mekal Zheng <me...@gmail.com> <me...@gmail.com>, spark users
<us...@spark.apache.org> <us...@spark.apache.org>
主题: Re: scala.MatchError on stand-alone cluster mode
Hi Mekal,
It may be a scala version mismatch error,kindly check whether you are
running both (your streaming app and spark cluster ) on 2.10 scala or 2.11.
Thanks,
Rishabh.
On Fri, Jul 15, 2016 at 1:38 PM, Saisai Shao <sa...@gmail.com> wrote:
> The error stack is throwing from your code:
>
> Caused by: scala.MatchError: [Ljava.lang.String;@68d279ec (of class
> [Ljava.lang.String;)
> at com.jd.deeplog.LogAggregator$.main(LogAggregator.scala:29)
> at com.jd.deeplog.LogAggregator.main(LogAggregator.scala)
>
> I think you should debug the code yourself, it may not be the problem of
> Spark.
>
> On Fri, Jul 15, 2016 at 3:17 PM, Mekal Zheng <me...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I have a Spark Streaming job written in Scala and is running well on
>> local and client mode, but when I submit it on cluster mode, the driver
>> reported an error shown as below.
>> Is there anyone know what is wrong here?
>> pls help me!
>>
>> the Job CODE is after
>>
>> 16/07/14 17:28:21 DEBUG ByteBufUtil:
>> -Dio.netty.threadLocalDirectBufferSize: 65536
>> 16/07/14 17:28:21 DEBUG NetUtil: Loopback interface: lo (lo,
>> 0:0:0:0:0:0:0:1%lo)
>> 16/07/14 17:28:21 DEBUG NetUtil: /proc/sys/net/core/somaxconn: 32768
>> 16/07/14 17:28:21 DEBUG TransportServer: Shuffle server started on port
>> :43492
>> 16/07/14 17:28:21 INFO Utils: Successfully started service 'Driver' on
>> port 43492.
>> 16/07/14 17:28:21 INFO WorkerWatcher: Connecting to worker spark://
>> Worker@172.20.130.98:23933
>> 16/07/14 17:28:21 DEBUG TransportClientFactory: Creating new connection
>> to /172.20.130.98:23933
>> Exception in thread "main" java.lang.reflect.InvocationTargetException
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:497)
>> at
>> org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:58)
>> at
>> org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
>> Caused by: scala.MatchError: [Ljava.lang.String;@68d279ec (of class
>> [Ljava.lang.String;)
>> at com.jd.deeplog.LogAggregator$.main(LogAggregator.scala:29)
>> at com.jd.deeplog.LogAggregator.main(LogAggregator.scala)
>> ... 6 more
>>
>> ==================
>> Job CODE:
>>
>> object LogAggregator {
>>
>> val batchDuration = Seconds(5)
>>
>> def main(args:Array[String]) {
>>
>> val usage =
>> """Usage: LogAggregator <zkQuorum> <group> <topics> <numThreads> <logFormat> <logSeparator> <batchDuration> <destType> <destPath>
>> | logFormat: fieldName:fieldRole[,fieldName:fieldRole] each field must have both name and role
>> | logFormat.role: can be key|avg|enum|sum|ignore
>> """.stripMargin
>>
>> if (args.length < 9) {
>> System.err.println(usage)
>> System.exit(1)
>> }
>>
>> val Array(zkQuorum, group, topics, numThreads, logFormat, logSeparator, batchDuration, destType, destPath) = args
>>
>> println("Start streaming calculation...")
>>
>> val conf = new SparkConf().setAppName("LBHaproxy-LogAggregator")
>> val ssc = new StreamingContext(conf, Seconds(batchDuration.toInt))
>>
>> val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
>>
>> val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
>>
>> val logFields = logFormat.split(",").map(field => {
>> val fld = field.split(":")
>> if (fld.size != 2) {
>> System.err.println("Wrong parameters for logFormat!\n")
>> System.err.println(usage)
>> System.exit(1)
>> }
>> // TODO: ensure the field has both 'name' and 'role'
>> new LogField(fld(0), fld(1))
>> })
>>
>> val keyFields = logFields.filter(logFieldName => {
>> logFieldName.role == "key"
>> })
>> val keys = keyFields.map(key => {
>> key.name
>> })
>>
>> val logsByKey = lines.map(line => {
>> val log = new Log(logFields, line, logSeparator)
>> log.toMap
>> }).filter(log => log.nonEmpty).map(log => {
>> val keys = keyFields.map(logField => {
>> log(logField.name).value
>> })
>>
>> val key = keys.reduce((key1, key2) => {
>> key1.asInstanceOf[String] + key2.asInstanceOf[String]
>> })
>>
>> val fullLog = log + ("count" -> new LogSegment("sum", 1))
>>
>> (key, fullLog)
>> })
>>
>>
>> val aggResults = logsByKey.reduceByKey((log_a, log_b) => {
>>
>> log_a.map(logField => {
>> val logFieldName = logField._1
>> val logSegment_a = logField._2
>> val logSegment_b = log_b(logFieldName)
>>
>> val segValue = logSegment_a.role match {
>> case "avg" => {
>> logSegment_a.value.toString.toInt + logSegment_b.value.toString.toInt
>> }
>> case "sum" => {
>> logSegment_a.value.toString.toInt + logSegment_b.value.toString.toInt
>> }
>> case "enum" => {
>> val list_a = logSegment_a.value.asInstanceOf[List[(String, Int)]]
>> val list_b = logSegment_b.value.asInstanceOf[List[(String, Int)]]
>> list_a ++ list_b
>> }
>> case _ => logSegment_a.value
>> }
>> (logFieldName, new LogSegment(logSegment_a.role, segValue))
>> })
>> }).map(logRecord => {
>> val log = logRecord._2
>> val count = log("count").value.toString.toInt
>>
>>
>> val logContent = log.map(logField => {
>> val logFieldName = logField._1
>> val logSegment = logField._2
>> val fieldValue = logSegment.role match {
>> case "avg" => {
>> logSegment.value.toString.toInt / count
>> }
>> case "enum" => {
>> val enumList = logSegment.value.asInstanceOf[List[(String, Int)]]
>> val enumJson = enumList.groupBy(_._1).map(el => el._2.reduce((e1, e2) => (e1._1, e1._2.toString.toInt + e2._2.toString.toInt)))
>> JSONObject(enumJson)
>> }
>> case _ => logSegment.value
>> }
>> (logFieldName, fieldValue)
>> })
>>
>> logContent + ("count" -> count)
>> })
>>
>> if (destType == "hbase") {
>>
>> val hbaseQuorum = "localhost"
>> val hbaseClientPort = "2181"
>> val hbaseStore = new HBaseStore(hbaseQuorum, hbaseClientPort, keys.toList, "tb_", true)
>>
>> val jobConf = hbaseStore.jobConf()
>>
>> aggResults.foreachRDD((rdd, time) => {
>> rdd.map(record => {
>> val logPut = hbaseStore.convert(record, time)
>> (new ImmutableBytesWritable, logPut)
>> }).saveAsHadoopDataset(jobConf)
>> })
>> } else if (destType == "file") {
>> aggResults.foreachRDD((rdd, time) => {
>> rdd.foreach(record => {
>> val res = record + ("timestamp" -> time.milliseconds)
>> io.File(destPath).appendAll(res.toString() + "\n")
>> })
>> })
>> }
>>
>> ssc.start()
>> ssc.awaitTermination()
>> }
>>
>>
>> --
>> Mekal Zheng
>> Sent with Airmail
>>
>
>
Re: scala.MatchError on stand-alone cluster mode
Posted by Saisai Shao <sa...@gmail.com>.
The error stack is throwing from your code:
Caused by: scala.MatchError: [Ljava.lang.String;@68d279ec (of class
[Ljava.lang.String;)
at com.jd.deeplog.LogAggregator$.main(LogAggregator.scala:29)
at com.jd.deeplog.LogAggregator.main(LogAggregator.scala)
I think you should debug the code yourself, it may not be the problem of
Spark.
On Fri, Jul 15, 2016 at 3:17 PM, Mekal Zheng <me...@gmail.com> wrote:
> Hi,
>
> I have a Spark Streaming job written in Scala and is running well on local
> and client mode, but when I submit it on cluster mode, the driver reported
> an error shown as below.
> Is there anyone know what is wrong here?
> pls help me!
>
> the Job CODE is after
>
> 16/07/14 17:28:21 DEBUG ByteBufUtil:
> -Dio.netty.threadLocalDirectBufferSize: 65536
> 16/07/14 17:28:21 DEBUG NetUtil: Loopback interface: lo (lo,
> 0:0:0:0:0:0:0:1%lo)
> 16/07/14 17:28:21 DEBUG NetUtil: /proc/sys/net/core/somaxconn: 32768
> 16/07/14 17:28:21 DEBUG TransportServer: Shuffle server started on port
> :43492
> 16/07/14 17:28:21 INFO Utils: Successfully started service 'Driver' on
> port 43492.
> 16/07/14 17:28:21 INFO WorkerWatcher: Connecting to worker spark://
> Worker@172.20.130.98:23933
> 16/07/14 17:28:21 DEBUG TransportClientFactory: Creating new connection to
> /172.20.130.98:23933
> Exception in thread "main" java.lang.reflect.InvocationTargetException
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at
> org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:58)
> at
> org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
> Caused by: scala.MatchError: [Ljava.lang.String;@68d279ec (of class
> [Ljava.lang.String;)
> at com.jd.deeplog.LogAggregator$.main(LogAggregator.scala:29)
> at com.jd.deeplog.LogAggregator.main(LogAggregator.scala)
> ... 6 more
>
> ==================
> Job CODE:
>
> object LogAggregator {
>
> val batchDuration = Seconds(5)
>
> def main(args:Array[String]) {
>
> val usage =
> """Usage: LogAggregator <zkQuorum> <group> <topics> <numThreads> <logFormat> <logSeparator> <batchDuration> <destType> <destPath>
> | logFormat: fieldName:fieldRole[,fieldName:fieldRole] each field must have both name and role
> | logFormat.role: can be key|avg|enum|sum|ignore
> """.stripMargin
>
> if (args.length < 9) {
> System.err.println(usage)
> System.exit(1)
> }
>
> val Array(zkQuorum, group, topics, numThreads, logFormat, logSeparator, batchDuration, destType, destPath) = args
>
> println("Start streaming calculation...")
>
> val conf = new SparkConf().setAppName("LBHaproxy-LogAggregator")
> val ssc = new StreamingContext(conf, Seconds(batchDuration.toInt))
>
> val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
>
> val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
>
> val logFields = logFormat.split(",").map(field => {
> val fld = field.split(":")
> if (fld.size != 2) {
> System.err.println("Wrong parameters for logFormat!\n")
> System.err.println(usage)
> System.exit(1)
> }
> // TODO: ensure the field has both 'name' and 'role'
> new LogField(fld(0), fld(1))
> })
>
> val keyFields = logFields.filter(logFieldName => {
> logFieldName.role == "key"
> })
> val keys = keyFields.map(key => {
> key.name
> })
>
> val logsByKey = lines.map(line => {
> val log = new Log(logFields, line, logSeparator)
> log.toMap
> }).filter(log => log.nonEmpty).map(log => {
> val keys = keyFields.map(logField => {
> log(logField.name).value
> })
>
> val key = keys.reduce((key1, key2) => {
> key1.asInstanceOf[String] + key2.asInstanceOf[String]
> })
>
> val fullLog = log + ("count" -> new LogSegment("sum", 1))
>
> (key, fullLog)
> })
>
>
> val aggResults = logsByKey.reduceByKey((log_a, log_b) => {
>
> log_a.map(logField => {
> val logFieldName = logField._1
> val logSegment_a = logField._2
> val logSegment_b = log_b(logFieldName)
>
> val segValue = logSegment_a.role match {
> case "avg" => {
> logSegment_a.value.toString.toInt + logSegment_b.value.toString.toInt
> }
> case "sum" => {
> logSegment_a.value.toString.toInt + logSegment_b.value.toString.toInt
> }
> case "enum" => {
> val list_a = logSegment_a.value.asInstanceOf[List[(String, Int)]]
> val list_b = logSegment_b.value.asInstanceOf[List[(String, Int)]]
> list_a ++ list_b
> }
> case _ => logSegment_a.value
> }
> (logFieldName, new LogSegment(logSegment_a.role, segValue))
> })
> }).map(logRecord => {
> val log = logRecord._2
> val count = log("count").value.toString.toInt
>
>
> val logContent = log.map(logField => {
> val logFieldName = logField._1
> val logSegment = logField._2
> val fieldValue = logSegment.role match {
> case "avg" => {
> logSegment.value.toString.toInt / count
> }
> case "enum" => {
> val enumList = logSegment.value.asInstanceOf[List[(String, Int)]]
> val enumJson = enumList.groupBy(_._1).map(el => el._2.reduce((e1, e2) => (e1._1, e1._2.toString.toInt + e2._2.toString.toInt)))
> JSONObject(enumJson)
> }
> case _ => logSegment.value
> }
> (logFieldName, fieldValue)
> })
>
> logContent + ("count" -> count)
> })
>
> if (destType == "hbase") {
>
> val hbaseQuorum = "localhost"
> val hbaseClientPort = "2181"
> val hbaseStore = new HBaseStore(hbaseQuorum, hbaseClientPort, keys.toList, "tb_", true)
>
> val jobConf = hbaseStore.jobConf()
>
> aggResults.foreachRDD((rdd, time) => {
> rdd.map(record => {
> val logPut = hbaseStore.convert(record, time)
> (new ImmutableBytesWritable, logPut)
> }).saveAsHadoopDataset(jobConf)
> })
> } else if (destType == "file") {
> aggResults.foreachRDD((rdd, time) => {
> rdd.foreach(record => {
> val res = record + ("timestamp" -> time.milliseconds)
> io.File(destPath).appendAll(res.toString() + "\n")
> })
> })
> }
>
> ssc.start()
> ssc.awaitTermination()
> }
>
>
> --
> Mekal Zheng
> Sent with Airmail
>