You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Mekal Zheng <me...@gmail.com> on 2016/07/15 07:17:17 UTC

scala.MatchError on stand-alone cluster mode

Hi,

I have a Spark Streaming job written in Scala and is running well on local
and client mode, but when I submit it on cluster mode, the driver reported
an error shown as below.
Is there anyone know what is wrong here?
pls help me!

the Job CODE is after

16/07/14 17:28:21 DEBUG ByteBufUtil:
-Dio.netty.threadLocalDirectBufferSize: 65536
16/07/14 17:28:21 DEBUG NetUtil: Loopback interface: lo (lo,
0:0:0:0:0:0:0:1%lo)
16/07/14 17:28:21 DEBUG NetUtil: /proc/sys/net/core/somaxconn: 32768
16/07/14 17:28:21 DEBUG TransportServer: Shuffle server started on port
:43492
16/07/14 17:28:21 INFO Utils: Successfully started service 'Driver' on port
43492.
16/07/14 17:28:21 INFO WorkerWatcher: Connecting to worker spark://
Worker@172.20.130.98:23933
16/07/14 17:28:21 DEBUG TransportClientFactory: Creating new connection to /
172.20.130.98:23933
Exception in thread "main" java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at
org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:58)
        at
org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
Caused by: scala.MatchError: [Ljava.lang.String;@68d279ec (of class
[Ljava.lang.String;)
        at com.jd.deeplog.LogAggregator$.main(LogAggregator.scala:29)
        at com.jd.deeplog.LogAggregator.main(LogAggregator.scala)
        ... 6 more

==================
Job CODE:

object LogAggregator {

  val batchDuration = Seconds(5)

  def main(args:Array[String]) {

    val usage =
      """Usage: LogAggregator <zkQuorum> <group> <topics> <numThreads>
<logFormat> <logSeparator> <batchDuration> <destType> <destPath>
        |  logFormat: fieldName:fieldRole[,fieldName:fieldRole] each
field must have both name and role
        |  logFormat.role: can be key|avg|enum|sum|ignore
      """.stripMargin

    if (args.length < 9) {
      System.err.println(usage)
      System.exit(1)
    }

    val Array(zkQuorum, group, topics, numThreads, logFormat,
logSeparator, batchDuration, destType, destPath) = args

    println("Start streaming calculation...")

    val conf = new SparkConf().setAppName("LBHaproxy-LogAggregator")
    val ssc = new StreamingContext(conf, Seconds(batchDuration.toInt))

    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap

    val lines = KafkaUtils.createStream(ssc, zkQuorum, group,
topicMap).map(_._2)

    val logFields = logFormat.split(",").map(field => {
      val fld = field.split(":")
      if (fld.size != 2) {
        System.err.println("Wrong parameters for logFormat!\n")
        System.err.println(usage)
        System.exit(1)
      }
      // TODO: ensure the field has both 'name' and 'role'
      new LogField(fld(0), fld(1))
    })

    val keyFields = logFields.filter(logFieldName => {
      logFieldName.role == "key"
    })
    val keys = keyFields.map(key => {
      key.name
    })

    val logsByKey = lines.map(line => {
      val log = new Log(logFields, line, logSeparator)
      log.toMap
    }).filter(log => log.nonEmpty).map(log => {
      val keys = keyFields.map(logField => {
        log(logField.name).value
      })

      val key = keys.reduce((key1, key2) => {
        key1.asInstanceOf[String] + key2.asInstanceOf[String]
      })

      val fullLog = log + ("count" -> new LogSegment("sum", 1))

      (key, fullLog)
    })


    val aggResults = logsByKey.reduceByKey((log_a, log_b) => {

      log_a.map(logField => {
        val logFieldName = logField._1
        val logSegment_a = logField._2
        val logSegment_b = log_b(logFieldName)

        val segValue = logSegment_a.role match {
          case "avg" => {
            logSegment_a.value.toString.toInt +
logSegment_b.value.toString.toInt
          }
          case "sum" => {
            logSegment_a.value.toString.toInt +
logSegment_b.value.toString.toInt
          }
          case "enum" => {
            val list_a = logSegment_a.value.asInstanceOf[List[(String, Int)]]
            val list_b = logSegment_b.value.asInstanceOf[List[(String, Int)]]
            list_a ++ list_b
          }
          case _ => logSegment_a.value
        }
        (logFieldName, new LogSegment(logSegment_a.role, segValue))
      })
    }).map(logRecord => {
      val log = logRecord._2
      val count = log("count").value.toString.toInt


      val logContent = log.map(logField => {
        val logFieldName = logField._1
        val logSegment = logField._2
        val fieldValue = logSegment.role match {
          case "avg" => {
            logSegment.value.toString.toInt / count
          }
          case "enum" => {
            val enumList = logSegment.value.asInstanceOf[List[(String, Int)]]
            val enumJson = enumList.groupBy(_._1).map(el =>
el._2.reduce((e1, e2) => (e1._1, e1._2.toString.toInt +
e2._2.toString.toInt)))
            JSONObject(enumJson)
          }
          case _ => logSegment.value
        }
        (logFieldName, fieldValue)
      })

      logContent + ("count" -> count)
    })

    if (destType == "hbase") {

      val hbaseQuorum = "localhost"
      val hbaseClientPort = "2181"
      val hbaseStore = new HBaseStore(hbaseQuorum, hbaseClientPort,
keys.toList, "tb_", true)

      val jobConf = hbaseStore.jobConf()

      aggResults.foreachRDD((rdd, time) => {
        rdd.map(record => {
          val logPut = hbaseStore.convert(record, time)
          (new ImmutableBytesWritable, logPut)
        }).saveAsHadoopDataset(jobConf)
      })
    } else if (destType == "file") {
      aggResults.foreachRDD((rdd, time) => {
        rdd.foreach(record => {
          val res = record + ("timestamp" -> time.milliseconds)
          io.File(destPath).appendAll(res.toString() + "\n")
        })
      })
    }

    ssc.start()
    ssc.awaitTermination()
  }


-- 
Mekal Zheng
Sent with Airmail

Re: scala.MatchError on stand-alone cluster mode

Posted by Mekal Zheng <me...@gmail.com>.
Hi, Rishabh Bhardwaj, Saisai Shao,

Thx for your help. I hava found that the key reason is I forgot to upload
the jar package to all of the node in cluster, so after the master
distributed the job and selected one node as the driver,  the driver can
not find the jar package and throw an exception.

-- 
Mekal Zheng
Sent with Airmail

发件人: Rishabh Bhardwaj <rb...@gmail.com> <rb...@gmail.com>
回复: Rishabh Bhardwaj <rb...@gmail.com> <rb...@gmail.com>
日期: July 15, 2016 at 17:28:43
至: Saisai Shao <sa...@gmail.com> <sa...@gmail.com>
抄送: Mekal Zheng <me...@gmail.com> <me...@gmail.com>, spark users
<us...@spark.apache.org> <us...@spark.apache.org>
主题:  Re: scala.MatchError on stand-alone cluster mode

Hi Mekal,
It may be a scala version mismatch error,kindly check whether you are
running both (your streaming app and spark cluster ) on 2.10 scala or 2.11.

Thanks,
Rishabh.

On Fri, Jul 15, 2016 at 1:38 PM, Saisai Shao <sa...@gmail.com> wrote:

> The error stack is throwing from your code:
>
> Caused by: scala.MatchError: [Ljava.lang.String;@68d279ec (of class
> [Ljava.lang.String;)
>         at com.jd.deeplog.LogAggregator$.main(LogAggregator.scala:29)
>         at com.jd.deeplog.LogAggregator.main(LogAggregator.scala)
>
> I think you should debug the code yourself, it may not be the problem of
> Spark.
>
> On Fri, Jul 15, 2016 at 3:17 PM, Mekal Zheng <me...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I have a Spark Streaming job written in Scala and is running well on
>> local and client mode, but when I submit it on cluster mode, the driver
>> reported an error shown as below.
>> Is there anyone know what is wrong here?
>> pls help me!
>>
>> the Job CODE is after
>>
>> 16/07/14 17:28:21 DEBUG ByteBufUtil:
>> -Dio.netty.threadLocalDirectBufferSize: 65536
>> 16/07/14 17:28:21 DEBUG NetUtil: Loopback interface: lo (lo,
>> 0:0:0:0:0:0:0:1%lo)
>> 16/07/14 17:28:21 DEBUG NetUtil: /proc/sys/net/core/somaxconn: 32768
>> 16/07/14 17:28:21 DEBUG TransportServer: Shuffle server started on port
>> :43492
>> 16/07/14 17:28:21 INFO Utils: Successfully started service 'Driver' on
>> port 43492.
>> 16/07/14 17:28:21 INFO WorkerWatcher: Connecting to worker spark://
>> Worker@172.20.130.98:23933
>> 16/07/14 17:28:21 DEBUG TransportClientFactory: Creating new connection
>> to /172.20.130.98:23933
>> Exception in thread "main" java.lang.reflect.InvocationTargetException
>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>         at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>         at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>         at java.lang.reflect.Method.invoke(Method.java:497)
>>         at
>> org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:58)
>>         at
>> org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
>> Caused by: scala.MatchError: [Ljava.lang.String;@68d279ec (of class
>> [Ljava.lang.String;)
>>         at com.jd.deeplog.LogAggregator$.main(LogAggregator.scala:29)
>>         at com.jd.deeplog.LogAggregator.main(LogAggregator.scala)
>>         ... 6 more
>>
>> ==================
>> Job CODE:
>>
>> object LogAggregator {
>>
>>   val batchDuration = Seconds(5)
>>
>>   def main(args:Array[String]) {
>>
>>     val usage =
>>       """Usage: LogAggregator <zkQuorum> <group> <topics> <numThreads> <logFormat> <logSeparator> <batchDuration> <destType> <destPath>
>>         |  logFormat: fieldName:fieldRole[,fieldName:fieldRole] each field must have both name and role
>>         |  logFormat.role: can be key|avg|enum|sum|ignore
>>       """.stripMargin
>>
>>     if (args.length < 9) {
>>       System.err.println(usage)
>>       System.exit(1)
>>     }
>>
>>     val Array(zkQuorum, group, topics, numThreads, logFormat, logSeparator, batchDuration, destType, destPath) = args
>>
>>     println("Start streaming calculation...")
>>
>>     val conf = new SparkConf().setAppName("LBHaproxy-LogAggregator")
>>     val ssc = new StreamingContext(conf, Seconds(batchDuration.toInt))
>>
>>     val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
>>
>>     val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
>>
>>     val logFields = logFormat.split(",").map(field => {
>>       val fld = field.split(":")
>>       if (fld.size != 2) {
>>         System.err.println("Wrong parameters for logFormat!\n")
>>         System.err.println(usage)
>>         System.exit(1)
>>       }
>>       // TODO: ensure the field has both 'name' and 'role'
>>       new LogField(fld(0), fld(1))
>>     })
>>
>>     val keyFields = logFields.filter(logFieldName => {
>>       logFieldName.role == "key"
>>     })
>>     val keys = keyFields.map(key => {
>>       key.name
>>     })
>>
>>     val logsByKey = lines.map(line => {
>>       val log = new Log(logFields, line, logSeparator)
>>       log.toMap
>>     }).filter(log => log.nonEmpty).map(log => {
>>       val keys = keyFields.map(logField => {
>>         log(logField.name).value
>>       })
>>
>>       val key = keys.reduce((key1, key2) => {
>>         key1.asInstanceOf[String] + key2.asInstanceOf[String]
>>       })
>>
>>       val fullLog = log + ("count" -> new LogSegment("sum", 1))
>>
>>       (key, fullLog)
>>     })
>>
>>
>>     val aggResults = logsByKey.reduceByKey((log_a, log_b) => {
>>
>>       log_a.map(logField => {
>>         val logFieldName = logField._1
>>         val logSegment_a = logField._2
>>         val logSegment_b = log_b(logFieldName)
>>
>>         val segValue = logSegment_a.role match {
>>           case "avg" => {
>>             logSegment_a.value.toString.toInt + logSegment_b.value.toString.toInt
>>           }
>>           case "sum" => {
>>             logSegment_a.value.toString.toInt + logSegment_b.value.toString.toInt
>>           }
>>           case "enum" => {
>>             val list_a = logSegment_a.value.asInstanceOf[List[(String, Int)]]
>>             val list_b = logSegment_b.value.asInstanceOf[List[(String, Int)]]
>>             list_a ++ list_b
>>           }
>>           case _ => logSegment_a.value
>>         }
>>         (logFieldName, new LogSegment(logSegment_a.role, segValue))
>>       })
>>     }).map(logRecord => {
>>       val log = logRecord._2
>>       val count = log("count").value.toString.toInt
>>
>>
>>       val logContent = log.map(logField => {
>>         val logFieldName = logField._1
>>         val logSegment = logField._2
>>         val fieldValue = logSegment.role match {
>>           case "avg" => {
>>             logSegment.value.toString.toInt / count
>>           }
>>           case "enum" => {
>>             val enumList = logSegment.value.asInstanceOf[List[(String, Int)]]
>>             val enumJson = enumList.groupBy(_._1).map(el => el._2.reduce((e1, e2) => (e1._1, e1._2.toString.toInt + e2._2.toString.toInt)))
>>             JSONObject(enumJson)
>>           }
>>           case _ => logSegment.value
>>         }
>>         (logFieldName, fieldValue)
>>       })
>>
>>       logContent + ("count" -> count)
>>     })
>>
>>     if (destType == "hbase") {
>>
>>       val hbaseQuorum = "localhost"
>>       val hbaseClientPort = "2181"
>>       val hbaseStore = new HBaseStore(hbaseQuorum, hbaseClientPort, keys.toList, "tb_", true)
>>
>>       val jobConf = hbaseStore.jobConf()
>>
>>       aggResults.foreachRDD((rdd, time) => {
>>         rdd.map(record => {
>>           val logPut = hbaseStore.convert(record, time)
>>           (new ImmutableBytesWritable, logPut)
>>         }).saveAsHadoopDataset(jobConf)
>>       })
>>     } else if (destType == "file") {
>>       aggResults.foreachRDD((rdd, time) => {
>>         rdd.foreach(record => {
>>           val res = record + ("timestamp" -> time.milliseconds)
>>           io.File(destPath).appendAll(res.toString() + "\n")
>>         })
>>       })
>>     }
>>
>>     ssc.start()
>>     ssc.awaitTermination()
>>   }
>>
>>
>> --
>> Mekal Zheng
>> Sent with Airmail
>>
>
>

Re: scala.MatchError on stand-alone cluster mode

Posted by Saisai Shao <sa...@gmail.com>.
The error stack is throwing from your code:

Caused by: scala.MatchError: [Ljava.lang.String;@68d279ec (of class
[Ljava.lang.String;)
        at com.jd.deeplog.LogAggregator$.main(LogAggregator.scala:29)
        at com.jd.deeplog.LogAggregator.main(LogAggregator.scala)

I think you should debug the code yourself, it may not be the problem of
Spark.

On Fri, Jul 15, 2016 at 3:17 PM, Mekal Zheng <me...@gmail.com> wrote:

> Hi,
>
> I have a Spark Streaming job written in Scala and is running well on local
> and client mode, but when I submit it on cluster mode, the driver reported
> an error shown as below.
> Is there anyone know what is wrong here?
> pls help me!
>
> the Job CODE is after
>
> 16/07/14 17:28:21 DEBUG ByteBufUtil:
> -Dio.netty.threadLocalDirectBufferSize: 65536
> 16/07/14 17:28:21 DEBUG NetUtil: Loopback interface: lo (lo,
> 0:0:0:0:0:0:0:1%lo)
> 16/07/14 17:28:21 DEBUG NetUtil: /proc/sys/net/core/somaxconn: 32768
> 16/07/14 17:28:21 DEBUG TransportServer: Shuffle server started on port
> :43492
> 16/07/14 17:28:21 INFO Utils: Successfully started service 'Driver' on
> port 43492.
> 16/07/14 17:28:21 INFO WorkerWatcher: Connecting to worker spark://
> Worker@172.20.130.98:23933
> 16/07/14 17:28:21 DEBUG TransportClientFactory: Creating new connection to
> /172.20.130.98:23933
> Exception in thread "main" java.lang.reflect.InvocationTargetException
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:497)
>         at
> org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:58)
>         at
> org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
> Caused by: scala.MatchError: [Ljava.lang.String;@68d279ec (of class
> [Ljava.lang.String;)
>         at com.jd.deeplog.LogAggregator$.main(LogAggregator.scala:29)
>         at com.jd.deeplog.LogAggregator.main(LogAggregator.scala)
>         ... 6 more
>
> ==================
> Job CODE:
>
> object LogAggregator {
>
>   val batchDuration = Seconds(5)
>
>   def main(args:Array[String]) {
>
>     val usage =
>       """Usage: LogAggregator <zkQuorum> <group> <topics> <numThreads> <logFormat> <logSeparator> <batchDuration> <destType> <destPath>
>         |  logFormat: fieldName:fieldRole[,fieldName:fieldRole] each field must have both name and role
>         |  logFormat.role: can be key|avg|enum|sum|ignore
>       """.stripMargin
>
>     if (args.length < 9) {
>       System.err.println(usage)
>       System.exit(1)
>     }
>
>     val Array(zkQuorum, group, topics, numThreads, logFormat, logSeparator, batchDuration, destType, destPath) = args
>
>     println("Start streaming calculation...")
>
>     val conf = new SparkConf().setAppName("LBHaproxy-LogAggregator")
>     val ssc = new StreamingContext(conf, Seconds(batchDuration.toInt))
>
>     val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
>
>     val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
>
>     val logFields = logFormat.split(",").map(field => {
>       val fld = field.split(":")
>       if (fld.size != 2) {
>         System.err.println("Wrong parameters for logFormat!\n")
>         System.err.println(usage)
>         System.exit(1)
>       }
>       // TODO: ensure the field has both 'name' and 'role'
>       new LogField(fld(0), fld(1))
>     })
>
>     val keyFields = logFields.filter(logFieldName => {
>       logFieldName.role == "key"
>     })
>     val keys = keyFields.map(key => {
>       key.name
>     })
>
>     val logsByKey = lines.map(line => {
>       val log = new Log(logFields, line, logSeparator)
>       log.toMap
>     }).filter(log => log.nonEmpty).map(log => {
>       val keys = keyFields.map(logField => {
>         log(logField.name).value
>       })
>
>       val key = keys.reduce((key1, key2) => {
>         key1.asInstanceOf[String] + key2.asInstanceOf[String]
>       })
>
>       val fullLog = log + ("count" -> new LogSegment("sum", 1))
>
>       (key, fullLog)
>     })
>
>
>     val aggResults = logsByKey.reduceByKey((log_a, log_b) => {
>
>       log_a.map(logField => {
>         val logFieldName = logField._1
>         val logSegment_a = logField._2
>         val logSegment_b = log_b(logFieldName)
>
>         val segValue = logSegment_a.role match {
>           case "avg" => {
>             logSegment_a.value.toString.toInt + logSegment_b.value.toString.toInt
>           }
>           case "sum" => {
>             logSegment_a.value.toString.toInt + logSegment_b.value.toString.toInt
>           }
>           case "enum" => {
>             val list_a = logSegment_a.value.asInstanceOf[List[(String, Int)]]
>             val list_b = logSegment_b.value.asInstanceOf[List[(String, Int)]]
>             list_a ++ list_b
>           }
>           case _ => logSegment_a.value
>         }
>         (logFieldName, new LogSegment(logSegment_a.role, segValue))
>       })
>     }).map(logRecord => {
>       val log = logRecord._2
>       val count = log("count").value.toString.toInt
>
>
>       val logContent = log.map(logField => {
>         val logFieldName = logField._1
>         val logSegment = logField._2
>         val fieldValue = logSegment.role match {
>           case "avg" => {
>             logSegment.value.toString.toInt / count
>           }
>           case "enum" => {
>             val enumList = logSegment.value.asInstanceOf[List[(String, Int)]]
>             val enumJson = enumList.groupBy(_._1).map(el => el._2.reduce((e1, e2) => (e1._1, e1._2.toString.toInt + e2._2.toString.toInt)))
>             JSONObject(enumJson)
>           }
>           case _ => logSegment.value
>         }
>         (logFieldName, fieldValue)
>       })
>
>       logContent + ("count" -> count)
>     })
>
>     if (destType == "hbase") {
>
>       val hbaseQuorum = "localhost"
>       val hbaseClientPort = "2181"
>       val hbaseStore = new HBaseStore(hbaseQuorum, hbaseClientPort, keys.toList, "tb_", true)
>
>       val jobConf = hbaseStore.jobConf()
>
>       aggResults.foreachRDD((rdd, time) => {
>         rdd.map(record => {
>           val logPut = hbaseStore.convert(record, time)
>           (new ImmutableBytesWritable, logPut)
>         }).saveAsHadoopDataset(jobConf)
>       })
>     } else if (destType == "file") {
>       aggResults.foreachRDD((rdd, time) => {
>         rdd.foreach(record => {
>           val res = record + ("timestamp" -> time.milliseconds)
>           io.File(destPath).appendAll(res.toString() + "\n")
>         })
>       })
>     }
>
>     ssc.start()
>     ssc.awaitTermination()
>   }
>
>
> --
> Mekal Zheng
> Sent with Airmail
>