You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Dirceu Semighini Filho <di...@gmail.com> on 2015/01/21 17:43:34 UTC

Issue with repartition and cache

Hi guys, have anyone find something like this?
I have a training set, and when I repartition it, if I call cache it throw
a classcastexception when I try to execute anything that access it

val rep120 = train.repartition(120)
val cached120 = rep120.cache
cached120.map(f => f(1).asInstanceOf[Int]).sum

Cell Toolbar:
   In [1]:

ClusterSettings.executorMemory=Some("28g")

ClusterSettings.maxResultSize = "20g"

ClusterSettings.resume=true

ClusterSettings.coreInstanceType="r3.xlarge"

ClusterSettings.coreInstanceCount = 30

ClusterSettings.clusterName="UberdataContextCluster-Dirceu"

uc.applyDateFormat("YYMMddHH")

Searching for existing cluster UberdataContextCluster-Dirceu ...
Spark standalone cluster started at
http://ec2-54-68-91-64.us-west-2.compute.amazonaws.com:8080
Found 1 master(s), 30 slaves
Ganglia started at
http://ec2-54-68-91-64.us-west-2.compute.amazonaws.com:5080/ganglia

In [37]:

import org.apache.spark.sql.catalyst.types._

import eleflow.uberdata.util.IntStringImplicitTypeConverter._

import eleflow.uberdata.enums.SupportedAlgorithm._

import eleflow.uberdata.data._

import org.apache.spark.mllib.tree.DecisionTree

import eleflow.uberdata.enums.DateSplitType._

import org.apache.spark.mllib.regression.LabeledPoint

import org.apache.spark.mllib.linalg.Vectors

import org.apache.spark.mllib.classification._

import eleflow.uberdata.model._

import eleflow.uberdata.data.stat.Statistics

import eleflow.uberdata.enums.ValidationMethod._

import org.apache.spark.rdd.RDD

In [5]:

val train = uc.load(uc.toHDFSURI("/tmp/data/input/train_rev4.csv")).applyColumnTypes(Seq(DecimalType(),
LongType,TimestampType, StringType,


             StringType, StringType, StringType, StringType,


              StringType, StringType, StringType, StringType,


              StringType, StringType, StringType, StringType,


             StringType, StringType, StringType, StringType,


              LongType, LongType,StringType, StringType,StringType,


              StringType,StringType))

.formatDateValues(2,DayOfAWeek | Period).slice(excludes = Seq(12,13))

Out[5]:
idclickhour1hour2C1banner_possite_idsite_domainsite_categoryapp_idapp_domain
app_categorydevice_modeldevice_typedevice_conn_typeC14C15C16C17C18C19C20C21
100000941815109427302.03.0100501fbe01fef384576728905ebdecad23867801e8d9
07d7df2244956a241215706320501722035-1791000016934911786371502.03.010050
1fbe01fef384576728905ebdecad23867801e8d907d7df22711ee1201015704320501722035
100084791000037190421511948602.03.0100501fbe01fef384576728905ebdecad2386
7801e8d907d7df228a4875bd1015704320501722035100084791000064072448083837602.0
3.0100501fbe01fef384576728905ebdecad23867801e8d907d7df226332421a101570632050
1722035100084791000067905641704209602.03.010051fe8cc4489166c1610569f928
ecad23867801e8d907d7df22779d90c21018993320502161035-115710000720757801103869
02.03.010050d6137915bb1ef334f028772becad23867801e8d907d7df228a4875bd1016920
32050189904311000771171000072472998854491102.03.0100508fda644b25d4cfcd
f028772becad23867801e8d907d7df22be6db1d71020362320502333039-1157
In [7]:

val test = uc.load(uc.toHDFSURI("/tmp/data/input/test_rev4.csv")).applyColumnTypes(Seq(DecimalType(),
TimestampType, StringType,


             StringType, StringType, StringType, StringType,


              StringType, StringType, StringType, StringType,


              StringType, StringType, StringType, StringType,


             StringType, StringType, StringType, StringType,


              LongType, LongType,StringType, StringType,StringType,


              StringType,StringType)).

formatDateValues(1,DayOfAWeek | Period).slice(excludes =Seq(11,12))

Out[7]:
idhour1hour2C1banner_possite_idsite_domainsite_categoryapp_idapp_domain
app_categorydevice_modeldevice_typedevice_conn_typeC14C15C16C17C18C19C20C21
100001740588092635695.03.010050235ba823f6ebf28ef028772becad23867801e8d9
07d7df220eb711ec10833032050761317510007523100001825269208554285.03.010050
1fbe01fef384576728905ebdecad23867801e8d907d7df22ecb851b21022676320502616035
10008351100005541398292139845.03.0100501fbe01fef384576728905ebdecad2386
7801e8d907d7df221f0bc64f102267632050261603510008351100010946378097988455.0
3.01005085f751fdc4e18dd650e219e051cedd4eaefc06bd0f2161f8542422a7101864832050
1092380910015661100013770415586707455.03.01005085f751fdc4e18dd650e219e0
9c13b4192347f47af95efa071f0bc64f1023160320502667047-122110001521204153353724
5.03.01005157fe1b205b626596f028772becad23867801e8d907d7df2268b6db2c106563320
50572239-132100019110567070233785.03.0100501fbe01fef384576728905ebdecad2386
7801e8d907d7df22d4897fef102281332050264723910014823
In [ ]:

val (validationPrediction2, logRegModel2, testDataSet2,
validationDataSet2, trainDataSet2, testPrediction2) =

        eleflow.uberdata.data.Predictor.predict(train,test,excludes=
Seq(6,7,9,10,12,13), iterations = 100, algorithm =
BinaryLogisticRegressionBFGS)

spent time 1943

Out[5]:

MappedRDD[165] at map at Predictor.scala:265

In [ ]:

val corr2 = eleflow.uberdata.data.stat.Statistics.targetCorrelation(validationDataSet2)

In [ ]:

val correlated = corr2.filter(_._1>0.01)

In [ ]:

val correlated2 = correlated.map(_._2)

Out[8]:

Array(11768, 11285, 11278, 11289, 12051, 11279, 42, 11805, 11767, 46,
22, 12063, 20, 8388, 11438, 11783, 8981, 11408, 8386, 11360, 11377,
12059, 11418, 12044, 11771, 11359, 11839, 9118, 9116, 8666, 11986,
8665, 8888, 8887, 18, 12058, 11925, 11468, 11336, 11769, 9254, 9029,
11404, 9028, 71, 11982, 11798, 63, 7401, 8673, 12040, 8664, 4986, 452,
11949, 12050, 76, 11800, 8975, 11189, 11743, 11956, 11801, 12026,
8976, 11784, 2418, 11808, 12054, 11904, 1819, 7, 1840, 11429, 11608,
11983, 11387, 9403, 11495, 11985, 8658, 1020, 11626, 8384, 41, 8387,
11778, 4390, 7067, 11489, 11542, 3, 8381, 9154, 11766, 11479, 9077,
10782, 11680, 11830, 12043, 8926, 8982, 11409, 11391, 11364, 8656,
1274, 5523, 9, 12025, 8279, 1528, 10, 11490, 12046, 6771, 3937, 11450,
11811, 8632, 38, 8898, 11382, 12028, 12053, 4563, 5040, 11330, 1983,
11799, 11327, 11672, 8628, 11342, 11813, 6450, 11825, 8941, 10407,
11806, 11643, 8940, 9405, 11757, 9075, 12056, 11522, 11688, 10406,
11322, 9076, 29, 12064, 8637, 11347, 10831, 11406, 11773, 40, 10560,
11645, 9404, 11789, 11651, 9743, 11835, 11843, 9382, 11971, 11646,
12065, 11984, 8681, 10563, 12039, 9383, 8680, 8391, 3260, 5453, 10120,
8602, 11649, 9385, 4320, 9384, 11210, 11750, 11319, 11787, 11506,
11628, 11415, 11777, 10576, 240, 12017, 0, 10121, 11644, 8929, 11392,
12024, 5602, 9280, 11473, 884, 11812, 10741, 11780, 11503, 8672,
11357, 11966, 12055, 11539, 8644, 11350, 11836, 9058, 11271, 11764,
5094, 7881, 11504, 11698, 11424, 11831, 11587, 11426, 2577, 11610,
8948, 11987, 10744, 9290, 11477, 11497, 11367, 8622, 11969, 12030,
8062, 11664, 11704, 10949, 11508, 10530, 10225, 7655, 4274, 10534,
11394, 8934, 15, 11671, 11845, 12069, 6767, 3713, 8979, 11310, 10670,
8978, 11498, 11281, 11291, 11549, 11840, 10119, 10419, 897, 5875,
11482, 10617, 9331, 10618, 11662, 12060, 11496, 10654, 9742, 11422,
12027, 11545, 6612, 9757, 11881, 19, 11321, 11402, 11256, 8389, 9379,
9741, 11705, 5188, 2780, 8593, 11325, 9452, 11255, 9304, 11990, 8393,
11853, 11619, 9312, 9061, 11425, 8385, 11642, 12023, 9303, 8885,
11375, 6807, 8576, 11528, 11485, 11786, 8518, 11834, 12066, 2257,
11345, 11333, 11903, 9918, 11992, 11257, 11488, 11637, 7215, 10556,
11744, 12018, 12031, 1990, 542, 6099, 9005, 11900, 9739, 11566, 11481,
11314, 12052, 11307, 1828, 12072, 5, 10020, 11413, 10138, 11295, 8959,
8025)

In [ ]:

val trained = trainDataSet2.map{f =>

                                val array = f._2.features.toArray

                                new LabeledPoint(f._2.label,Vectors.dense(

correlated2.map(i => array(i))))}.cache

Out[9]:

MappedRDD[175] at map at <console>:52

In [ ]:

val m = Predictor.binaryLogisticRegressionModelSGD(trained,100)

In [23]:

val validated = validationDataSet2.map{f =>

                                val array = f._2.features.toArray

                                (f._1,new LabeledPoint(f._2.label,Vectors.dense(

correlated2.map(i => array(i)))))}.cache

Out[23]:

MappedRDD[682] at map at <console>:71

In [24]:

val prediction = validated.map {

      case (ids, point) =>

        (ids, m.model.asInstanceOf[LogisticRegressionModel].predict(point.features))

    }

Out[24]:

MappedRDD[683] at map at <console>:79

In [20]:

validated.take(2)

Out[20]:

Array((0.0,[0.0,0.0,0.0,1.0,0.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,2.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0]),
(0.0,[0.0,0.0,0.0,1.0,0.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,2.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0]))

In [26]:

val logloss = eleflow.uberdata.data.stat.Statistics.logLoss(prediction)

Out[26]:

5.861273254972684

In [17]:

validationDataSet2.take(3)

Out[17]:

Array((0,(0.0,(12073,[0,1,4,9,18,42,4563,8382,8386,8575,11279,11289,11322,11766,11803,11904,12065],[2.0,3.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]))),
(0,(0.0,(12073,[0,1,4,9,18,42,3260,8382,8386,8577,11279,11289,11322,11766,11803,11904,12065],[2.0,3.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]))),
(0,(0.0,(12073,[0,1,4,10,40,42,4729,8382,8386,8672,11279,11289,11357,11768,11805,11852,12051],[2.0,3.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]))))

In [ ]:

trained.take(4)

In [7]:



import org.apache.spark.mllib.classification._

In [8]:

val steps = Seq(Step(10,2),new Step(7,3), new Step(6,4))

Out[8]:
0Step(10,2)1Step(7,3)2Step(6,4)
In [ ]:

val predictor  =
eleflow.uberdata.data.Predictor.evolutivePredict(train.repartition(240),
test, algorithm = BinaryLogisticRegressionBFGS,

                             validationMethod = LogarithmicLoss, steps
= steps, iterations = 30)

In [ ]:

uc.terminate

In [11]:

train.partitions.size

Out[11]:

94

In [20]:

val rep60 = train.repartition(120)

Out[20]:
idclickhourC1banner_possite_idsite_domainsite_categoryapp_idapp_domain
app_categorydevice_iddevice_ipdevice_modeldevice_typedevice_conn_typeC14C15
C16C17C18C19C20C211769841751868484765301410221210051e8f79e60c4342784f028772b
ecad23867801e8d907d7df22a99f214ab526ff2ce9b8d8d71020634320502374339-123
17703074559452740131141022121005085f751fdc4e18dd650e219e0399477562347f47a
cef3e64932d58615ab5a307674de3ee61221768320502506035-115717708054784542889711
0141022121005085f751fdc4e18dd650e219e051cedd4eaefc06bd0f2161f8a99f214a
d30ecac3542422a7102161132050248032971001116117713001998424865357114102212
1005085f751fdc4e18dd650e219e0bc44c87d7801e8d90f2161f8ad97ca8caa305f51
43836a961020633320502374339-123177175933008005586270141022121005085f751fd
c4e18dd650e219e0f888bf4c5b9c592b0f2161f89a5442e768bc961a1f0bc64f102115332050
2420235-169177224932175731189110141022121005085f751fdc4e18dd650e219e0
e96773f02347f47a0f2161f8a99f214abf741817ef726eae1021767320502506035-1157
17727816327614515164014102212100505bcf81a29d54950bf028772becad23867801e8d9
07d7df22a99f214a5e4ee78bbe87996b1221770320502507035100176157
In [ ]:

val cach60 = rep60.cache

In [28]:

cach60.map(f => f(1).asInstanceOf[Int]).sum

org.apache.spark.SparkException: Job aborted due to stage failure:
Task 53 in stage 53.0 failed 4 times, most recent failure: Lost task
53.3 in stage 53.0 (TID 1322,
ip-172-31-0-62.us-west-2.compute.internal):
java.lang.ClassCastException: java.lang.String cannot be cast to
java.lang.Integer
	at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:106)
	at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:59)
	at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:59)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
	at scala.collection.Iterator$class.foreach(Iterator.scala:727)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
	at scala.collection.TraversableOnce$class.reduceLeft(TraversableOnce.scala:172)
	at scala.collection.AbstractIterator.reduceLeft(Iterator.scala:1157)
	at org.apache.spark.rdd.RDD$$anonfun$18.apply(RDD.scala:853)
	at org.apache.spark.rdd.RDD$$anonfun$18.apply(RDD.scala:851)
	at org.apache.spark.SparkContext$$anonfun$29.apply(SparkContext.scala:1350)
	at org.apache.spark.SparkContext$$anonfun$29.apply(SparkContext.scala:1350)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
	at org.apache.spark.scheduler.Task.run(Task.scala:56)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
    org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
    org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
    org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
    scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
    org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
    org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
    scala.Option.foreach(Option.scala:236)
    org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
    org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
    akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
    akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    akka.actor.ActorCell.invoke(ActorCell.scala:487)
    akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
    akka.dispatch.Mailbox.run(Mailbox.scala:220)
    akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
    scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



R

Re: Issue with repartition and cache

Posted by ankits <an...@gmail.com>.
Hi,

Did you ever figure this one out? I'm seeing the same behavior:

Calling cache() after a repartition() makes Spark cache the version of the
RDD BEFORE the repartition, which means a shuffle everytime it is accessed..

However, calling cache before the repartition() seems to work fine, the
cached version has the new partitioning applied.


In summary, these 2 patterns dont seem to work as expected:

-------
repartition()
cache()
--------
repartition()
cache()
count()
---------



But this works:

cache()
repartition()

Very strange..



--
View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Issue-with-repartition-and-cache-tp10235p19664.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscribe@spark.apache.org


Re: Issue with repartition and cache

Posted by Dirceu Semighini Filho <di...@gmail.com>.
Hi Sandy, thanks for the reply.

I tried to run this code without the cache and it worked.
Also if I cache before repartition, it also works, the problem seems to be
something related with repartition and caching.
My train is a SchemaRDD, and if I make all my columns as StringType, the
error doesn't happen, but if I have anything else, this exception is thrown.



2015-01-21 16:37 GMT-02:00 Sandy Ryza <sa...@cloudera.com>:

> Hi Dirceu,
>
> Does the issue not show up if you run "map(f =>
> f(1).asInstanceOf[Int]).sum" on the "train" RDD?  It appears that f(1) is
> an String, not an Int.  If you're looking to parse and convert it, "toInt"
> should be used instead of "asInstanceOf".
>
> -Sandy
>
> On Wed, Jan 21, 2015 at 8:43 AM, Dirceu Semighini Filho <
> dirceu.semighini@gmail.com> wrote:
>
>> Hi guys, have anyone find something like this?
>> I have a training set, and when I repartition it, if I call cache it throw
>> a classcastexception when I try to execute anything that access it
>>
>> val rep120 = train.repartition(120)
>> val cached120 = rep120.cache
>> cached120.map(f => f(1).asInstanceOf[Int]).sum
>>
>> Cell Toolbar:
>>    In [1]:
>>
>> ClusterSettings.executorMemory=Some("28g")
>>
>> ClusterSettings.maxResultSize = "20g"
>>
>> ClusterSettings.resume=true
>>
>> ClusterSettings.coreInstanceType="r3.xlarge"
>>
>> ClusterSettings.coreInstanceCount = 30
>>
>> ClusterSettings.clusterName="UberdataContextCluster-Dirceu"
>>
>> uc.applyDateFormat("YYMMddHH")
>>
>> Searching for existing cluster UberdataContextCluster-Dirceu ...
>> Spark standalone cluster started at
>> http://ec2-54-68-91-64.us-west-2.compute.amazonaws.com:8080
>> Found 1 master(s), 30 slaves
>> Ganglia started at
>> http://ec2-54-68-91-64.us-west-2.compute.amazonaws.com:5080/ganglia
>>
>> In [37]:
>>
>> import org.apache.spark.sql.catalyst.types._
>>
>> import eleflow.uberdata.util.IntStringImplicitTypeConverter._
>>
>> import eleflow.uberdata.enums.SupportedAlgorithm._
>>
>> import eleflow.uberdata.data._
>>
>> import org.apache.spark.mllib.tree.DecisionTree
>>
>> import eleflow.uberdata.enums.DateSplitType._
>>
>> import org.apache.spark.mllib.regression.LabeledPoint
>>
>> import org.apache.spark.mllib.linalg.Vectors
>>
>> import org.apache.spark.mllib.classification._
>>
>> import eleflow.uberdata.model._
>>
>> import eleflow.uberdata.data.stat.Statistics
>>
>> import eleflow.uberdata.enums.ValidationMethod._
>>
>> import org.apache.spark.rdd.RDD
>>
>> In [5]:
>>
>> val train =
>> uc.load(uc.toHDFSURI("/tmp/data/input/train_rev4.csv")).applyColumnTypes(Seq(DecimalType(),
>> LongType,TimestampType, StringType,
>>
>>
>>              StringType, StringType, StringType, StringType,
>>
>>
>>               StringType, StringType, StringType, StringType,
>>
>>
>>               StringType, StringType, StringType, StringType,
>>
>>
>>              StringType, StringType, StringType, StringType,
>>
>>
>>               LongType, LongType,StringType, StringType,StringType,
>>
>>
>>               StringType,StringType))
>>
>> .formatDateValues(2,DayOfAWeek | Period).slice(excludes = Seq(12,13))
>>
>> Out[5]:
>>
>> idclickhour1hour2C1banner_possite_idsite_domainsite_categoryapp_idapp_domain
>>
>> app_categorydevice_modeldevice_typedevice_conn_typeC14C15C16C17C18C19C20C21
>> 100000941815109427302.03.0100501fbe01fef384576728905ebdecad23867801e8d9
>> 07d7df2244956a241215706320501722035-1791000016934911786371502.03.010050
>>
>> 1fbe01fef384576728905ebdecad23867801e8d907d7df22711ee1201015704320501722035
>> 100084791000037190421511948602.03.0100501fbe01fef384576728905ebdecad2386
>>
>> 7801e8d907d7df228a4875bd1015704320501722035100084791000064072448083837602.0
>>
>> 3.0100501fbe01fef384576728905ebdecad23867801e8d907d7df226332421a101570632050
>> 1722035100084791000067905641704209602.03.010051fe8cc4489166c1610569f928
>>
>> ecad23867801e8d907d7df22779d90c21018993320502161035-115710000720757801103869
>>
>> 02.03.010050d6137915bb1ef334f028772becad23867801e8d907d7df228a4875bd1016920
>> 32050189904311000771171000072472998854491102.03.0100508fda644b25d4cfcd
>> f028772becad23867801e8d907d7df22be6db1d71020362320502333039-1157
>> In [7]:
>>
>> val test =
>> uc.load(uc.toHDFSURI("/tmp/data/input/test_rev4.csv")).applyColumnTypes(Seq(DecimalType(),
>> TimestampType, StringType,
>>
>>
>>              StringType, StringType, StringType, StringType,
>>
>>
>>               StringType, StringType, StringType, StringType,
>>
>>
>>               StringType, StringType, StringType, StringType,
>>
>>
>>              StringType, StringType, StringType, StringType,
>>
>>
>>               LongType, LongType,StringType, StringType,StringType,
>>
>>
>>               StringType,StringType)).
>>
>> formatDateValues(1,DayOfAWeek | Period).slice(excludes =Seq(11,12))
>>
>> Out[7]:
>> idhour1hour2C1banner_possite_idsite_domainsite_categoryapp_idapp_domain
>>
>> app_categorydevice_modeldevice_typedevice_conn_typeC14C15C16C17C18C19C20C21
>> 100001740588092635695.03.010050235ba823f6ebf28ef028772becad23867801e8d9
>> 07d7df220eb711ec10833032050761317510007523100001825269208554285.03.010050
>>
>> 1fbe01fef384576728905ebdecad23867801e8d907d7df22ecb851b21022676320502616035
>> 10008351100005541398292139845.03.0100501fbe01fef384576728905ebdecad2386
>> 7801e8d907d7df221f0bc64f102267632050261603510008351100010946378097988455.0
>>
>> 3.01005085f751fdc4e18dd650e219e051cedd4eaefc06bd0f2161f8542422a7101864832050
>> 1092380910015661100013770415586707455.03.01005085f751fdc4e18dd650e219e0
>>
>> 9c13b4192347f47af95efa071f0bc64f1023160320502667047-122110001521204153353724
>>
>> 5.03.01005157fe1b205b626596f028772becad23867801e8d907d7df2268b6db2c106563320
>>
>> 50572239-132100019110567070233785.03.0100501fbe01fef384576728905ebdecad2386
>> 7801e8d907d7df22d4897fef102281332050264723910014823
>> In [ ]:
>>
>> val (validationPrediction2, logRegModel2, testDataSet2,
>> validationDataSet2, trainDataSet2, testPrediction2) =
>>
>>         eleflow.uberdata.data.Predictor.predict(train,test,excludes=
>> Seq(6,7,9,10,12,13), iterations = 100, algorithm =
>> BinaryLogisticRegressionBFGS)
>>
>> spent time 1943
>>
>> Out[5]:
>>
>> MappedRDD[165] at map at Predictor.scala:265
>>
>> In [ ]:
>>
>> val corr2 =
>> eleflow.uberdata.data.stat.Statistics.targetCorrelation(validationDataSet2)
>>
>> In [ ]:
>>
>> val correlated = corr2.filter(_._1>0.01)
>>
>> In [ ]:
>>
>> val correlated2 = correlated.map(_._2)
>>
>> Out[8]:
>>
>> Array(11768, 11285, 11278, 11289, 12051, 11279, 42, 11805, 11767, 46,
>> 22, 12063, 20, 8388, 11438, 11783, 8981, 11408, 8386, 11360, 11377,
>> 12059, 11418, 12044, 11771, 11359, 11839, 9118, 9116, 8666, 11986,
>> 8665, 8888, 8887, 18, 12058, 11925, 11468, 11336, 11769, 9254, 9029,
>> 11404, 9028, 71, 11982, 11798, 63, 7401, 8673, 12040, 8664, 4986, 452,
>> 11949, 12050, 76, 11800, 8975, 11189, 11743, 11956, 11801, 12026,
>> 8976, 11784, 2418, 11808, 12054, 11904, 1819, 7, 1840, 11429, 11608,
>> 11983, 11387, 9403, 11495, 11985, 8658, 1020, 11626, 8384, 41, 8387,
>> 11778, 4390, 7067, 11489, 11542, 3, 8381, 9154, 11766, 11479, 9077,
>> 10782, 11680, 11830, 12043, 8926, 8982, 11409, 11391, 11364, 8656,
>> 1274, 5523, 9, 12025, 8279, 1528, 10, 11490, 12046, 6771, 3937, 11450,
>> 11811, 8632, 38, 8898, 11382, 12028, 12053, 4563, 5040, 11330, 1983,
>> 11799, 11327, 11672, 8628, 11342, 11813, 6450, 11825, 8941, 10407,
>> 11806, 11643, 8940, 9405, 11757, 9075, 12056, 11522, 11688, 10406,
>> 11322, 9076, 29, 12064, 8637, 11347, 10831, 11406, 11773, 40, 10560,
>> 11645, 9404, 11789, 11651, 9743, 11835, 11843, 9382, 11971, 11646,
>> 12065, 11984, 8681, 10563, 12039, 9383, 8680, 8391, 3260, 5453, 10120,
>> 8602, 11649, 9385, 4320, 9384, 11210, 11750, 11319, 11787, 11506,
>> 11628, 11415, 11777, 10576, 240, 12017, 0, 10121, 11644, 8929, 11392,
>> 12024, 5602, 9280, 11473, 884, 11812, 10741, 11780, 11503, 8672,
>> 11357, 11966, 12055, 11539, 8644, 11350, 11836, 9058, 11271, 11764,
>> 5094, 7881, 11504, 11698, 11424, 11831, 11587, 11426, 2577, 11610,
>> 8948, 11987, 10744, 9290, 11477, 11497, 11367, 8622, 11969, 12030,
>> 8062, 11664, 11704, 10949, 11508, 10530, 10225, 7655, 4274, 10534,
>> 11394, 8934, 15, 11671, 11845, 12069, 6767, 3713, 8979, 11310, 10670,
>> 8978, 11498, 11281, 11291, 11549, 11840, 10119, 10419, 897, 5875,
>> 11482, 10617, 9331, 10618, 11662, 12060, 11496, 10654, 9742, 11422,
>> 12027, 11545, 6612, 9757, 11881, 19, 11321, 11402, 11256, 8389, 9379,
>> 9741, 11705, 5188, 2780, 8593, 11325, 9452, 11255, 9304, 11990, 8393,
>> 11853, 11619, 9312, 9061, 11425, 8385, 11642, 12023, 9303, 8885,
>> 11375, 6807, 8576, 11528, 11485, 11786, 8518, 11834, 12066, 2257,
>> 11345, 11333, 11903, 9918, 11992, 11257, 11488, 11637, 7215, 10556,
>> 11744, 12018, 12031, 1990, 542, 6099, 9005, 11900, 9739, 11566, 11481,
>> 11314, 12052, 11307, 1828, 12072, 5, 10020, 11413, 10138, 11295, 8959,
>> 8025)
>>
>> In [ ]:
>>
>> val trained = trainDataSet2.map{f =>
>>
>>                                 val array = f._2.features.toArray
>>
>>                                 new LabeledPoint(f._2.label,Vectors.dense(
>>
>> correlated2.map(i => array(i))))}.cache
>>
>> Out[9]:
>>
>> MappedRDD[175] at map at <console>:52
>>
>> In [ ]:
>>
>> val m = Predictor.binaryLogisticRegressionModelSGD(trained,100)
>>
>> In [23]:
>>
>> val validated = validationDataSet2.map{f =>
>>
>>                                 val array = f._2.features.toArray
>>
>>                                 (f._1,new
>> LabeledPoint(f._2.label,Vectors.dense(
>>
>> correlated2.map(i => array(i)))))}.cache
>>
>> Out[23]:
>>
>> MappedRDD[682] at map at <console>:71
>>
>> In [24]:
>>
>> val prediction = validated.map {
>>
>>       case (ids, point) =>
>>
>>         (ids,
>> m.model.asInstanceOf[LogisticRegressionModel].predict(point.features))
>>
>>     }
>>
>> Out[24]:
>>
>> MappedRDD[683] at map at <console>:79
>>
>> In [20]:
>>
>> validated.take(2)
>>
>> Out[20]:
>>
>>
>> Array((0.0,[0.0,0.0,0.0,1.0,0.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,2.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0]),
>>
>> (0.0,[0.0,0.0,0.0,1.0,0.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,2.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0]))
>>
>> In [26]:
>>
>> val logloss = eleflow.uberdata.data.stat.Statistics.logLoss(prediction)
>>
>> Out[26]:
>>
>> 5.861273254972684
>>
>> In [17]:
>>
>> validationDataSet2.take(3)
>>
>> Out[17]:
>>
>>
>> Array((0,(0.0,(12073,[0,1,4,9,18,42,4563,8382,8386,8575,11279,11289,11322,11766,11803,11904,12065],[2.0,3.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]))),
>>
>> (0,(0.0,(12073,[0,1,4,9,18,42,3260,8382,8386,8577,11279,11289,11322,11766,11803,11904,12065],[2.0,3.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]))),
>>
>> (0,(0.0,(12073,[0,1,4,10,40,42,4729,8382,8386,8672,11279,11289,11357,11768,11805,11852,12051],[2.0,3.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]))))
>>
>> In [ ]:
>>
>> trained.take(4)
>>
>> In [7]:
>>
>>
>>
>> import org.apache.spark.mllib.classification._
>>
>> In [8]:
>>
>> val steps = Seq(Step(10,2),new Step(7,3), new Step(6,4))
>>
>> Out[8]:
>> 0Step(10,2)1Step(7,3)2Step(6,4)
>> In [ ]:
>>
>> val predictor  =
>> eleflow.uberdata.data.Predictor.evolutivePredict(train.repartition(240),
>> test, algorithm = BinaryLogisticRegressionBFGS,
>>
>>                              validationMethod = LogarithmicLoss, steps
>> = steps, iterations = 30)
>>
>> In [ ]:
>>
>> uc.terminate
>>
>> In [11]:
>>
>> train.partitions.size
>>
>> Out[11]:
>>
>> 94
>>
>> In [20]:
>>
>> val rep60 = train.repartition(120)
>>
>> Out[20]:
>> idclickhourC1banner_possite_idsite_domainsite_categoryapp_idapp_domain
>>
>> app_categorydevice_iddevice_ipdevice_modeldevice_typedevice_conn_typeC14C15
>>
>> C16C17C18C19C20C211769841751868484765301410221210051e8f79e60c4342784f028772b
>> ecad23867801e8d907d7df22a99f214ab526ff2ce9b8d8d71020634320502374339-123
>> 17703074559452740131141022121005085f751fdc4e18dd650e219e0399477562347f47a
>>
>> cef3e64932d58615ab5a307674de3ee61221768320502506035-115717708054784542889711
>> 0141022121005085f751fdc4e18dd650e219e051cedd4eaefc06bd0f2161f8a99f214a
>> d30ecac3542422a7102161132050248032971001116117713001998424865357114102212
>> 1005085f751fdc4e18dd650e219e0bc44c87d7801e8d90f2161f8ad97ca8caa305f51
>> 43836a961020633320502374339-123177175933008005586270141022121005085f751fd
>>
>> c4e18dd650e219e0f888bf4c5b9c592b0f2161f89a5442e768bc961a1f0bc64f102115332050
>> 2420235-169177224932175731189110141022121005085f751fdc4e18dd650e219e0
>> e96773f02347f47a0f2161f8a99f214abf741817ef726eae1021767320502506035-1157
>> 17727816327614515164014102212100505bcf81a29d54950bf028772becad23867801e8d9
>> 07d7df22a99f214a5e4ee78bbe87996b1221770320502507035100176157
>> In [ ]:
>>
>> val cach60 = rep60.cache
>>
>> In [28]:
>>
>> cach60.map(f => f(1).asInstanceOf[Int]).sum
>>
>> org.apache.spark.SparkException: Job aborted due to stage failure:
>> Task 53 in stage 53.0 failed 4 times, most recent failure: Lost task
>> 53.3 in stage 53.0 (TID 1322,
>> ip-172-31-0-62.us-west-2.compute.internal):
>> java.lang.ClassCastException: java.lang.String cannot be cast to
>> java.lang.Integer
>>         at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:106)
>>         at
>> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:59)
>>         at
>> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:59)
>>         at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>>         at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>>         at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>         at
>> scala.collection.TraversableOnce$class.reduceLeft(TraversableOnce.scala:172)
>>         at
>> scala.collection.AbstractIterator.reduceLeft(Iterator.scala:1157)
>>         at org.apache.spark.rdd.RDD$$anonfun$18.apply(RDD.scala:853)
>>         at org.apache.spark.rdd.RDD$$anonfun$18.apply(RDD.scala:851)
>>         at
>> org.apache.spark.SparkContext$$anonfun$29.apply(SparkContext.scala:1350)
>>         at
>> org.apache.spark.SparkContext$$anonfun$29.apply(SparkContext.scala:1350)
>>         at
>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>>         at org.apache.spark.scheduler.Task.run(Task.scala:56)
>>         at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
>>         at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>         at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>         at java.lang.Thread.run(Thread.java:745)
>>
>> Driver stacktrace:
>>     org.apache.spark.scheduler.DAGScheduler.org
>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
>>
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
>>
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
>>
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>     scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>
>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
>>
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
>>
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
>>     scala.Option.foreach(Option.scala:236)
>>
>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
>>
>> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
>>     akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>>
>> org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
>>     akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>>     akka.actor.ActorCell.invoke(ActorCell.scala:487)
>>     akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>>     akka.dispatch.Mailbox.run(Mailbox.scala:220)
>>
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
>>     scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>>
>>
>> R
>>
>
>

Re: Issue with repartition and cache

Posted by Sandy Ryza <sa...@cloudera.com>.
Hi Dirceu,

Does the issue not show up if you run "map(f =>
f(1).asInstanceOf[Int]).sum" on the "train" RDD?  It appears that f(1) is
an String, not an Int.  If you're looking to parse and convert it, "toInt"
should be used instead of "asInstanceOf".

-Sandy

On Wed, Jan 21, 2015 at 8:43 AM, Dirceu Semighini Filho <
dirceu.semighini@gmail.com> wrote:

> Hi guys, have anyone find something like this?
> I have a training set, and when I repartition it, if I call cache it throw
> a classcastexception when I try to execute anything that access it
>
> val rep120 = train.repartition(120)
> val cached120 = rep120.cache
> cached120.map(f => f(1).asInstanceOf[Int]).sum
>
> Cell Toolbar:
>    In [1]:
>
> ClusterSettings.executorMemory=Some("28g")
>
> ClusterSettings.maxResultSize = "20g"
>
> ClusterSettings.resume=true
>
> ClusterSettings.coreInstanceType="r3.xlarge"
>
> ClusterSettings.coreInstanceCount = 30
>
> ClusterSettings.clusterName="UberdataContextCluster-Dirceu"
>
> uc.applyDateFormat("YYMMddHH")
>
> Searching for existing cluster UberdataContextCluster-Dirceu ...
> Spark standalone cluster started at
> http://ec2-54-68-91-64.us-west-2.compute.amazonaws.com:8080
> Found 1 master(s), 30 slaves
> Ganglia started at
> http://ec2-54-68-91-64.us-west-2.compute.amazonaws.com:5080/ganglia
>
> In [37]:
>
> import org.apache.spark.sql.catalyst.types._
>
> import eleflow.uberdata.util.IntStringImplicitTypeConverter._
>
> import eleflow.uberdata.enums.SupportedAlgorithm._
>
> import eleflow.uberdata.data._
>
> import org.apache.spark.mllib.tree.DecisionTree
>
> import eleflow.uberdata.enums.DateSplitType._
>
> import org.apache.spark.mllib.regression.LabeledPoint
>
> import org.apache.spark.mllib.linalg.Vectors
>
> import org.apache.spark.mllib.classification._
>
> import eleflow.uberdata.model._
>
> import eleflow.uberdata.data.stat.Statistics
>
> import eleflow.uberdata.enums.ValidationMethod._
>
> import org.apache.spark.rdd.RDD
>
> In [5]:
>
> val train =
> uc.load(uc.toHDFSURI("/tmp/data/input/train_rev4.csv")).applyColumnTypes(Seq(DecimalType(),
> LongType,TimestampType, StringType,
>
>
>              StringType, StringType, StringType, StringType,
>
>
>               StringType, StringType, StringType, StringType,
>
>
>               StringType, StringType, StringType, StringType,
>
>
>              StringType, StringType, StringType, StringType,
>
>
>               LongType, LongType,StringType, StringType,StringType,
>
>
>               StringType,StringType))
>
> .formatDateValues(2,DayOfAWeek | Period).slice(excludes = Seq(12,13))
>
> Out[5]:
>
> idclickhour1hour2C1banner_possite_idsite_domainsite_categoryapp_idapp_domain
> app_categorydevice_modeldevice_typedevice_conn_typeC14C15C16C17C18C19C20C21
> 100000941815109427302.03.0100501fbe01fef384576728905ebdecad23867801e8d9
> 07d7df2244956a241215706320501722035-1791000016934911786371502.03.010050
> 1fbe01fef384576728905ebdecad23867801e8d907d7df22711ee1201015704320501722035
> 100084791000037190421511948602.03.0100501fbe01fef384576728905ebdecad2386
> 7801e8d907d7df228a4875bd1015704320501722035100084791000064072448083837602.0
>
> 3.0100501fbe01fef384576728905ebdecad23867801e8d907d7df226332421a101570632050
> 1722035100084791000067905641704209602.03.010051fe8cc4489166c1610569f928
>
> ecad23867801e8d907d7df22779d90c21018993320502161035-115710000720757801103869
> 02.03.010050d6137915bb1ef334f028772becad23867801e8d907d7df228a4875bd1016920
> 32050189904311000771171000072472998854491102.03.0100508fda644b25d4cfcd
> f028772becad23867801e8d907d7df22be6db1d71020362320502333039-1157
> In [7]:
>
> val test =
> uc.load(uc.toHDFSURI("/tmp/data/input/test_rev4.csv")).applyColumnTypes(Seq(DecimalType(),
> TimestampType, StringType,
>
>
>              StringType, StringType, StringType, StringType,
>
>
>               StringType, StringType, StringType, StringType,
>
>
>               StringType, StringType, StringType, StringType,
>
>
>              StringType, StringType, StringType, StringType,
>
>
>               LongType, LongType,StringType, StringType,StringType,
>
>
>               StringType,StringType)).
>
> formatDateValues(1,DayOfAWeek | Period).slice(excludes =Seq(11,12))
>
> Out[7]:
> idhour1hour2C1banner_possite_idsite_domainsite_categoryapp_idapp_domain
> app_categorydevice_modeldevice_typedevice_conn_typeC14C15C16C17C18C19C20C21
> 100001740588092635695.03.010050235ba823f6ebf28ef028772becad23867801e8d9
> 07d7df220eb711ec10833032050761317510007523100001825269208554285.03.010050
> 1fbe01fef384576728905ebdecad23867801e8d907d7df22ecb851b21022676320502616035
> 10008351100005541398292139845.03.0100501fbe01fef384576728905ebdecad2386
> 7801e8d907d7df221f0bc64f102267632050261603510008351100010946378097988455.0
>
> 3.01005085f751fdc4e18dd650e219e051cedd4eaefc06bd0f2161f8542422a7101864832050
> 1092380910015661100013770415586707455.03.01005085f751fdc4e18dd650e219e0
>
> 9c13b4192347f47af95efa071f0bc64f1023160320502667047-122110001521204153353724
>
> 5.03.01005157fe1b205b626596f028772becad23867801e8d907d7df2268b6db2c106563320
> 50572239-132100019110567070233785.03.0100501fbe01fef384576728905ebdecad2386
> 7801e8d907d7df22d4897fef102281332050264723910014823
> In [ ]:
>
> val (validationPrediction2, logRegModel2, testDataSet2,
> validationDataSet2, trainDataSet2, testPrediction2) =
>
>         eleflow.uberdata.data.Predictor.predict(train,test,excludes=
> Seq(6,7,9,10,12,13), iterations = 100, algorithm =
> BinaryLogisticRegressionBFGS)
>
> spent time 1943
>
> Out[5]:
>
> MappedRDD[165] at map at Predictor.scala:265
>
> In [ ]:
>
> val corr2 =
> eleflow.uberdata.data.stat.Statistics.targetCorrelation(validationDataSet2)
>
> In [ ]:
>
> val correlated = corr2.filter(_._1>0.01)
>
> In [ ]:
>
> val correlated2 = correlated.map(_._2)
>
> Out[8]:
>
> Array(11768, 11285, 11278, 11289, 12051, 11279, 42, 11805, 11767, 46,
> 22, 12063, 20, 8388, 11438, 11783, 8981, 11408, 8386, 11360, 11377,
> 12059, 11418, 12044, 11771, 11359, 11839, 9118, 9116, 8666, 11986,
> 8665, 8888, 8887, 18, 12058, 11925, 11468, 11336, 11769, 9254, 9029,
> 11404, 9028, 71, 11982, 11798, 63, 7401, 8673, 12040, 8664, 4986, 452,
> 11949, 12050, 76, 11800, 8975, 11189, 11743, 11956, 11801, 12026,
> 8976, 11784, 2418, 11808, 12054, 11904, 1819, 7, 1840, 11429, 11608,
> 11983, 11387, 9403, 11495, 11985, 8658, 1020, 11626, 8384, 41, 8387,
> 11778, 4390, 7067, 11489, 11542, 3, 8381, 9154, 11766, 11479, 9077,
> 10782, 11680, 11830, 12043, 8926, 8982, 11409, 11391, 11364, 8656,
> 1274, 5523, 9, 12025, 8279, 1528, 10, 11490, 12046, 6771, 3937, 11450,
> 11811, 8632, 38, 8898, 11382, 12028, 12053, 4563, 5040, 11330, 1983,
> 11799, 11327, 11672, 8628, 11342, 11813, 6450, 11825, 8941, 10407,
> 11806, 11643, 8940, 9405, 11757, 9075, 12056, 11522, 11688, 10406,
> 11322, 9076, 29, 12064, 8637, 11347, 10831, 11406, 11773, 40, 10560,
> 11645, 9404, 11789, 11651, 9743, 11835, 11843, 9382, 11971, 11646,
> 12065, 11984, 8681, 10563, 12039, 9383, 8680, 8391, 3260, 5453, 10120,
> 8602, 11649, 9385, 4320, 9384, 11210, 11750, 11319, 11787, 11506,
> 11628, 11415, 11777, 10576, 240, 12017, 0, 10121, 11644, 8929, 11392,
> 12024, 5602, 9280, 11473, 884, 11812, 10741, 11780, 11503, 8672,
> 11357, 11966, 12055, 11539, 8644, 11350, 11836, 9058, 11271, 11764,
> 5094, 7881, 11504, 11698, 11424, 11831, 11587, 11426, 2577, 11610,
> 8948, 11987, 10744, 9290, 11477, 11497, 11367, 8622, 11969, 12030,
> 8062, 11664, 11704, 10949, 11508, 10530, 10225, 7655, 4274, 10534,
> 11394, 8934, 15, 11671, 11845, 12069, 6767, 3713, 8979, 11310, 10670,
> 8978, 11498, 11281, 11291, 11549, 11840, 10119, 10419, 897, 5875,
> 11482, 10617, 9331, 10618, 11662, 12060, 11496, 10654, 9742, 11422,
> 12027, 11545, 6612, 9757, 11881, 19, 11321, 11402, 11256, 8389, 9379,
> 9741, 11705, 5188, 2780, 8593, 11325, 9452, 11255, 9304, 11990, 8393,
> 11853, 11619, 9312, 9061, 11425, 8385, 11642, 12023, 9303, 8885,
> 11375, 6807, 8576, 11528, 11485, 11786, 8518, 11834, 12066, 2257,
> 11345, 11333, 11903, 9918, 11992, 11257, 11488, 11637, 7215, 10556,
> 11744, 12018, 12031, 1990, 542, 6099, 9005, 11900, 9739, 11566, 11481,
> 11314, 12052, 11307, 1828, 12072, 5, 10020, 11413, 10138, 11295, 8959,
> 8025)
>
> In [ ]:
>
> val trained = trainDataSet2.map{f =>
>
>                                 val array = f._2.features.toArray
>
>                                 new LabeledPoint(f._2.label,Vectors.dense(
>
> correlated2.map(i => array(i))))}.cache
>
> Out[9]:
>
> MappedRDD[175] at map at <console>:52
>
> In [ ]:
>
> val m = Predictor.binaryLogisticRegressionModelSGD(trained,100)
>
> In [23]:
>
> val validated = validationDataSet2.map{f =>
>
>                                 val array = f._2.features.toArray
>
>                                 (f._1,new
> LabeledPoint(f._2.label,Vectors.dense(
>
> correlated2.map(i => array(i)))))}.cache
>
> Out[23]:
>
> MappedRDD[682] at map at <console>:71
>
> In [24]:
>
> val prediction = validated.map {
>
>       case (ids, point) =>
>
>         (ids,
> m.model.asInstanceOf[LogisticRegressionModel].predict(point.features))
>
>     }
>
> Out[24]:
>
> MappedRDD[683] at map at <console>:79
>
> In [20]:
>
> validated.take(2)
>
> Out[20]:
>
>
> Array((0.0,[0.0,0.0,0.0,1.0,0.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,2.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0]),
>
> (0.0,[0.0,0.0,0.0,1.0,0.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,2.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0]))
>
> In [26]:
>
> val logloss = eleflow.uberdata.data.stat.Statistics.logLoss(prediction)
>
> Out[26]:
>
> 5.861273254972684
>
> In [17]:
>
> validationDataSet2.take(3)
>
> Out[17]:
>
>
> Array((0,(0.0,(12073,[0,1,4,9,18,42,4563,8382,8386,8575,11279,11289,11322,11766,11803,11904,12065],[2.0,3.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]))),
>
> (0,(0.0,(12073,[0,1,4,9,18,42,3260,8382,8386,8577,11279,11289,11322,11766,11803,11904,12065],[2.0,3.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]))),
>
> (0,(0.0,(12073,[0,1,4,10,40,42,4729,8382,8386,8672,11279,11289,11357,11768,11805,11852,12051],[2.0,3.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]))))
>
> In [ ]:
>
> trained.take(4)
>
> In [7]:
>
>
>
> import org.apache.spark.mllib.classification._
>
> In [8]:
>
> val steps = Seq(Step(10,2),new Step(7,3), new Step(6,4))
>
> Out[8]:
> 0Step(10,2)1Step(7,3)2Step(6,4)
> In [ ]:
>
> val predictor  =
> eleflow.uberdata.data.Predictor.evolutivePredict(train.repartition(240),
> test, algorithm = BinaryLogisticRegressionBFGS,
>
>                              validationMethod = LogarithmicLoss, steps
> = steps, iterations = 30)
>
> In [ ]:
>
> uc.terminate
>
> In [11]:
>
> train.partitions.size
>
> Out[11]:
>
> 94
>
> In [20]:
>
> val rep60 = train.repartition(120)
>
> Out[20]:
> idclickhourC1banner_possite_idsite_domainsite_categoryapp_idapp_domain
> app_categorydevice_iddevice_ipdevice_modeldevice_typedevice_conn_typeC14C15
>
> C16C17C18C19C20C211769841751868484765301410221210051e8f79e60c4342784f028772b
> ecad23867801e8d907d7df22a99f214ab526ff2ce9b8d8d71020634320502374339-123
> 17703074559452740131141022121005085f751fdc4e18dd650e219e0399477562347f47a
>
> cef3e64932d58615ab5a307674de3ee61221768320502506035-115717708054784542889711
> 0141022121005085f751fdc4e18dd650e219e051cedd4eaefc06bd0f2161f8a99f214a
> d30ecac3542422a7102161132050248032971001116117713001998424865357114102212
> 1005085f751fdc4e18dd650e219e0bc44c87d7801e8d90f2161f8ad97ca8caa305f51
> 43836a961020633320502374339-123177175933008005586270141022121005085f751fd
>
> c4e18dd650e219e0f888bf4c5b9c592b0f2161f89a5442e768bc961a1f0bc64f102115332050
> 2420235-169177224932175731189110141022121005085f751fdc4e18dd650e219e0
> e96773f02347f47a0f2161f8a99f214abf741817ef726eae1021767320502506035-1157
> 17727816327614515164014102212100505bcf81a29d54950bf028772becad23867801e8d9
> 07d7df22a99f214a5e4ee78bbe87996b1221770320502507035100176157
> In [ ]:
>
> val cach60 = rep60.cache
>
> In [28]:
>
> cach60.map(f => f(1).asInstanceOf[Int]).sum
>
> org.apache.spark.SparkException: Job aborted due to stage failure:
> Task 53 in stage 53.0 failed 4 times, most recent failure: Lost task
> 53.3 in stage 53.0 (TID 1322,
> ip-172-31-0-62.us-west-2.compute.internal):
> java.lang.ClassCastException: java.lang.String cannot be cast to
> java.lang.Integer
>         at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:106)
>         at
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:59)
>         at
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:59)
>         at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>         at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>         at
> scala.collection.TraversableOnce$class.reduceLeft(TraversableOnce.scala:172)
>         at
> scala.collection.AbstractIterator.reduceLeft(Iterator.scala:1157)
>         at org.apache.spark.rdd.RDD$$anonfun$18.apply(RDD.scala:853)
>         at org.apache.spark.rdd.RDD$$anonfun$18.apply(RDD.scala:851)
>         at
> org.apache.spark.SparkContext$$anonfun$29.apply(SparkContext.scala:1350)
>         at
> org.apache.spark.SparkContext$$anonfun$29.apply(SparkContext.scala:1350)
>         at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>         at org.apache.spark.scheduler.Task.run(Task.scala:56)
>         at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:745)
>
> Driver stacktrace:
>     org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
>
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>     scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
>     scala.Option.foreach(Option.scala:236)
>
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
>
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
>     akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
>     akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>     akka.actor.ActorCell.invoke(ActorCell.scala:487)
>     akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>     akka.dispatch.Mailbox.run(Mailbox.scala:220)
>
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
>     scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
>
> R
>