You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sean Owen (JIRA)" <ji...@apache.org> on 2017/05/10 13:40:04 UTC

[jira] [Resolved] (SPARK-20695) Running multiple TCP socket streams in Spark Shell causes driver error

     [ https://issues.apache.org/jira/browse/SPARK-20695?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Sean Owen resolved SPARK-20695.
-------------------------------
    Resolution: Invalid

I don't believe that's anything to do with TCP; you are enabling Kryo registration but didn't register some class you are serializing. This is a question about debugging your app and shouldn't be a Spark JIRA.

You need to read http://spark.apache.org/contributing.html too; you would never set Blocker for example.

> Running multiple TCP socket streams in Spark Shell causes driver error
> ----------------------------------------------------------------------
>
>                 Key: SPARK-20695
>                 URL: https://issues.apache.org/jira/browse/SPARK-20695
>             Project: Spark
>          Issue Type: Bug
>          Components: DStreams, Spark Core, Spark Shell, Structured Streaming
>    Affects Versions: 2.0.2
>         Environment: DataStax DSE apache 3 node cassandra running with analytics on RHEL 7.3 on Hyper-V windows 10 laptop.
>            Reporter: Peter Mead
>            Priority: Blocker
>
> Whenever I include a second socket stream (lines02) the script errors if I am not trying to process data. If I remove the lines02.... the script runs fine!!
> script:
> val s_server01="192.168.1.10"
> val s_port01  = 8801
> val s_port02  = 8802
> import org.apache.spark.streaming._, org.apache.spark.streaming.StreamingContext._
> import scala.util.Random
> import org.apache.spark._
> import org.apache.spark.storage._
> import org.apache.spark.streaming.receiver._
> import java.util.Date;
> import java.text.SimpleDateFormat;
> import java.util.Calendar;
> import sys.process._
> import org.apache.spark.streaming.dstream.ConstantInputDStream
> sc.setLogLevel("ERROR")
> val df2 = new SimpleDateFormat("dd-MM-yyyy HH:mm:ss")
> var processed:Long = 0
> var pdate=""
> case class t_row (card_number: String, event_date: Int, event_time: Int, processed: Long, transport_type: String, card_credit: java.lang.Float, 
> transport_location: String, journey_type: Int,  journey_value: java.lang.Float)
> var type2tot = 0
> var type5tot = 0
> var numb=0
> var total_secs:Double = 0
> val red    = "\033[0;31m"
> val green  = "\033[0;32m"
> val cyan   = "\033[0;36m"
> val yellow = "\033[0;33m"
> val nocolour = "\033[0;0m"
> var color = ""
> val t_int = 5
> var init = 0
> var tot_cnt:Long = 0
> val ssc = new StreamingContext(sc, Seconds(t_int))
> val lines01 = ssc.socketTextStream(s_server01, s_port01)
> val lines02 = ssc.socketTextStream(s_server01, s_port02)
> // val lines   = lines01.union(lines02)
> val line01 = lines01.foreachRDD( rdd => {
> println("\n------------line 01")
> if (init == 0) {"clear".!;init = 1}
> val xx=rdd.map(bb => (bb.substring(0,bb.length).split(",")))
> val processed = System.currentTimeMillis
> val bb = xx.map(line => t_row(line(0), line(1).toInt, line(2).toInt, System.currentTimeMillis, line(3), line(4).toFloat, line(5), 
> line(6).toInt, line(7).toFloat ))
> val cnt:Long = bb.count
> bb.saveToCassandra("transport", "card_data_input")
> })
> //val line02 = lines02.foreachRDD( rdd => {
> //println("------------line 02")
> //if (init == 0) {"clear".!;init = 1}
> //val xx=rdd.map(bb => (bb.substring(0,bb.length).split(",")))
> //xx.collect.foreach(println)
> //val processed = System.currentTimeMillis
> //val bb = xx.map(line => t_row(line(0), line(1).toInt, line(2).toInt, System.currentTimeMillis, line(3), line(4).toFloat, line(5), 
> //line(6).toInt, line(7).toFloat ))
> //val cnt:Long = bb.count
> //bb.saveToCassandra("transport", "card_data_input")
> //})
> ERROR:
> software.kryo.KryoException: Encountered unregistered class ID: 13994
>         at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:137)
>         at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:670)
>         at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:781)
>         at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:229)
>         at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:169)
>         at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)...etc



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org