You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Rich Tarro <ri...@gmail.com> on 2016/07/01 16:24:53 UTC

Re: Random Forest Classification

Hi Bryan.

Thanks for your continued help.

Here is the code shown in a Jupyter notebook. I figured this was easier
that cutting and pasting the code into an email. If you  would like me to
send you the code in a different format let, me know. The necessary data is
all downloaded within the notebook itself.

https://console.ng.bluemix.net/data/notebooks/fe7e578a-401f-4744-a318-b1b6bcf6f5f8/view?access_token=2f6df7b1dfcb3c1c2d94a794506bb282729dab8f05118fafe5f11886326e02fc

A few additional pieces of information.

1. The training dataset is cached before training the model. If you do not
cache the training dataset, the model will not train. The code
model.transform(test) fails with a similar error. No other changes besides
caching or not caching. Again, with the training dataset cached, the model
can be successfully trained as seen in the notebook.

2. I have another version of the notebook where I download the same data in
libsvm format rather than csv. That notebook works fine. All the code is
essentially the same accounting for the difference in file formats.

3. I tested this same code on another Spark cloud platform and it displays
the same symptoms when run there.

Thanks.
Rich


On Wed, Jun 29, 2016 at 12:59 AM, Bryan Cutler <cu...@gmail.com> wrote:

> Are you fitting the VectorIndexer to the entire data set and not just
> training or test data?  If you are able to post your code and some data to
> reproduce, that would help in troubleshooting.
>
> On Tue, Jun 28, 2016 at 4:40 PM, Rich Tarro <ri...@gmail.com> wrote:
>
>> Thanks for the response, but in my case I reversed the meaning of
>> "prediction" and "predictedLabel". It seemed to make more sense to me that
>> way, but in retrospect, it probably only causes confusion to anyone else
>> looking at this. I reran the code with all the pipeline stage inputs and
>> outputs named exactly as in the Random Forest Classifier example to make
>> sure I hadn't messed anything up when I renamed things. Same error.
>>
>> I'm still at the point where I can train the model and make predictions,
>> but not able to get the MulticlassClassificationEvaluator to work on the
>> DataFrame of predictions.
>>
>> Any other suggestions? Thanks.
>>
>>
>>
>> On Tue, Jun 28, 2016 at 4:21 PM, Rich Tarro <ri...@gmail.com> wrote:
>>
>>> I created a ML pipeline using the Random Forest Classifier - similar to
>>> what is described here except in my case the source data is in csv format
>>> rather than libsvm.
>>>
>>>
>>> https://spark.apache.org/docs/latest/ml-classification-regression.html#random-forest-classifier
>>>
>>> I am able to successfully train the model and make predictions (on test
>>> data not used to train the model) as shown here.
>>>
>>> +------------+--------------+-----+----------+--------------------+
>>> |indexedLabel|predictedLabel|label|prediction|            features|
>>> +------------+--------------+-----+----------+--------------------+
>>> |         4.0|           4.0|    0|         0|(784,[124,125,126...|
>>> |         2.0|           2.0|    3|         3|(784,[119,120,121...|
>>> |         8.0|           8.0|    8|         8|(784,[180,181,182...|
>>> |         0.0|           0.0|    1|         1|(784,[154,155,156...|
>>> |         3.0|           8.0|    2|         8|(784,[148,149,150...|
>>> +------------+--------------+-----+----------+--------------------+
>>> only showing top 5 rows
>>>
>>> However, when I attempt to calculate the error between the indexedLabel and the precictedLabel using the MulticlassClassificationEvaluator, I get the NoSuchElementException error attached below.
>>>
>>> val evaluator = new MulticlassClassificationEvaluator().setLabelCol("indexedLabel").setPredictionCol("predictedLabel").setMetricName("precision")
>>> val accuracy = evaluator.evaluate(predictions)
>>> println("Test Error = " + (1.0 - accuracy))
>>>
>>> What could be the issue?
>>>
>>>
>>>
>>> Name: org.apache.spark.SparkException
>>> Message: Job aborted due to stage failure: Task 2 in stage 49.0 failed 10 times, most recent failure: Lost task 2.9 in stage 49.0 (TID 162, yp-spark-dal09-env5-0024): java.util.NoSuchElementException: key not found: 132.0
>>> 	at scala.collection.MapLike$class.default(MapLike.scala:228)
>>> 	at scala.collection.AbstractMap.default(Map.scala:58)
>>> 	at scala.collection.MapLike$class.apply(MapLike.scala:141)
>>> 	at scala.collection.AbstractMap.apply(Map.scala:58)
>>> 	at org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$10.apply(VectorIndexer.scala:331)
>>> 	at org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$10.apply(VectorIndexer.scala:309)
>>> 	at org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$11.apply(VectorIndexer.scala:351)
>>> 	at org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$11.apply(VectorIndexer.scala:351)
>>> 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown Source)
>>> 	at org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:67)
>>> 	at org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:67)
>>> 	at org.apache.spark.sql.execution.Filter$$anonfun$2$$anonfun$apply$2.apply(basicOperators.scala:74)
>>> 	at org.apache.spark.sql.execution.Filter$$anonfun$2$$anonfun$apply$2.apply(basicOperators.scala:72)
>>> 	at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390)
>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>> 	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:189)
>>> 	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64)
>>> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>>> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>> 	at org.apache.spark.scheduler.Task.run(Task.scala:89)
>>> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153)
>>> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>>> 	at java.lang.Thread.run(Thread.java:785)
>>>
>>> Driver stacktrace:
>>> StackTrace: org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
>>> scala.Option.foreach(Option.scala:236)
>>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
>>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>> java.lang.Thread.getStackTrace(Thread.java:1117)
>>> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
>>> org.apache.spark.SparkContext.runJob(SparkContext.scala:1837)
>>> org.apache.spark.SparkContext.runJob(SparkContext.scala:1850)
>>> org.apache.spark.SparkContext.runJob(SparkContext.scala:1863)
>>> org.apache.spark.SparkContext.runJob(SparkContext.scala:1934)
>>> org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927)
>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
>>> org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
>>> org.apache.spark.rdd.RDD.collect(RDD.scala:926)
>>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$collectAsMap$1.apply(PairRDDFunctions.scala:741)
>>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$collectAsMap$1.apply(PairRDDFunctions.scala:740)
>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
>>> org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
>>> org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.scala:740)
>>> org.apache.spark.mllib.evaluation.MulticlassMetrics.tpByClass$lzycompute(MulticlassMetrics.scala:49)
>>> org.apache.spark.mllib.evaluation.MulticlassMetrics.tpByClass(MulticlassMetrics.scala:45)
>>> org.apache.spark.mllib.evaluation.MulticlassMetrics.precision$lzycompute(MulticlassMetrics.scala:142)
>>> org.apache.spark.mllib.evaluation.MulticlassMetrics.precision(MulticlassMetrics.scala:142)
>>> org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator.evaluate(MulticlassClassificationEvaluator.scala:84)
>>> $line110.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:59)
>>> $line110.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:64)
>>> $line110.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:66)
>>> $line110.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:68)
>>> $line110.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:70)
>>> $line110.$read$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:72)
>>> $line110.$read$$iwC$$iwC$$iwC$$iwC.<init>(<console>:74)
>>> $line110.$read$$iwC$$iwC$$iwC.<init>(<console>:76)
>>> $line110.$read$$iwC$$iwC.<init>(<console>:78)
>>> $line110.$read$$iwC.<init>(<console>:80)
>>> $line110.$read.<init>(<console>:82)
>>> $line110.$read$.<init>(<console>:86)
>>> $line110.$read$.<clinit>(<console>)
>>> $line110.$eval$.<init>(<console>:7)
>>> $line110.$eval$.<clinit>(<console>)
>>> $line110.$eval.$print(<console>)
>>> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:95)
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:55)
>>> java.lang.reflect.Method.invoke(Method.java:507)
>>> org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
>>> org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346)
>>> org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
>>> org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
>>> org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
>>> com.ibm.spark.interpreter.ScalaInterpreter$$anonfun$interpretAddTask$1$$anonfun$apply$3.apply(ScalaInterpreter.scala:296)
>>> com.ibm.spark.interpreter.ScalaInterpreter$$anonfun$interpretAddTask$1$$anonfun$apply$3.apply(ScalaInterpreter.scala:291)
>>> com.ibm.spark.global.StreamState$.withStreams(StreamState.scala:80)
>>> com.ibm.spark.interpreter.ScalaInterpreter$$anonfun$interpretAddTask$1.apply(ScalaInterpreter.scala:290)
>>> com.ibm.spark.interpreter.ScalaInterpreter$$anonfun$interpretAddTask$1.apply(ScalaInterpreter.scala:290)
>>> com.ibm.spark.utils.TaskManager$$anonfun$add$2$$anon$1.run(TaskManager.scala:123)
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153)
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>>> java.lang.Thread.run(Thread.java:785)
>>>
>>>
>>>
>>>
>>>
>>>
>>
>

Re: Random Forest Classification

Posted by Bryan Cutler <cu...@gmail.com>.
I see.  You might try this, create a pipeline of just your feature
transformers, then call fit() on the complete dataset to get a model.
Finally make second pipeline and add this model and the decision tree as
stages.

On Aug 30, 2016 8:19 PM, "Bahubali Jain" <ba...@gmail.com> wrote:

> Hi Bryan,
> Thanks for the reply.
> I am indexing 5 columns ,then using these indexed columns to generate the
> "feature" column thru vector assembler.
> Which essentially means that I cannot use *fit()* directly on
> "completeDataset" dataframe since it will neither have the "feature" column
> and nor the 5 indexed columns.
> Of-course there is a dirty way of doing this, but I am wondering if there
> some optimized/intelligent approach for this.
>
> Thanks,
> Baahu
>
> On Wed, Aug 31, 2016 at 3:30 AM, Bryan Cutler <cu...@gmail.com> wrote:
>
>> You need to first fit just the VectorIndexer which returns the model,
>> then add the model to the pipeline where it will only transform.
>>
>> val featureVectorIndexer = new VectorIndexer()
>>     .setInputCol("feature")
>>     .setOutputCol("indexedfeature")
>>     .setMaxCategories(180)
>>     .fit(completeDataset)
>>
>> On Tue, Aug 30, 2016 at 9:57 AM, Bahubali Jain <ba...@gmail.com>
>> wrote:
>>
>>> Hi,
>>> I had run into similar exception " java.util.NoSuchElementException:
>>> key not found: " .
>>> After further investigation I realized it is happening due to
>>> vectorindexer being executed on training dataset and not on entire dataset.
>>>
>>> In the dataframe I have 5 categories , each of these have to go thru
>>> stringindexer and then these are put thru a vector indexer to generate
>>> feature vector.
>>> What is the right way to do this, so that vector indexer can be run on
>>> the entire data and not just on training data?
>>>
>>> Below is the current approach, as evident  VectorIndexer is being
>>> generated based on the training set.
>>>
>>> Please Note: fit() on Vectorindexer cannot be called on entireset
>>> dataframe since it doesn't have the required column(*feature *column is
>>> being generated dynamically in pipeline execution)
>>> How can the vectorindexer be *fit()* on the entireset?
>>>
>>>  val col1_indexer = new StringIndexer().setInputCol("c
>>> ol1").setOutputCol("indexed_col1")
>>> val col2_indexer = new StringIndexer().setInputCol("c
>>> ol2").setOutputCol("indexed_col2")
>>> val col3_indexer = new StringIndexer().setInputCol("c
>>> ol3").setOutputCol("indexed_col3")
>>> val col4_indexer = new StringIndexer().setInputCol("c
>>> ol4").setOutputCol("indexed_col4")
>>> val col5_indexer = new StringIndexer().setInputCol("c
>>> ol5").setOutputCol("indexed_col5")
>>>
>>> val featureArray =  Array("indexed_col1","indexed_
>>> col2","indexed_col3","indexed_col4","indexed_col5")
>>> val vectorAssembler = new VectorAssembler().setInputCols
>>> (featureArray).setOutputCol("*feature*")
>>> val featureVectorIndexer = new VectorIndexer()
>>>     .setInputCol("feature")
>>>     .setOutputCol("indexedfeature")
>>>     .setMaxCategories(180)
>>>
>>> val decisionTree = new DecisionTreeClassifier().setMa
>>> xBins(300).setMaxDepth(1).setImpurity("entropy").setLabelCol
>>> ("indexed_user_action").setFeaturesCol("indexedfeature").
>>> setPredictionCol("prediction")
>>>
>>> val pipeline = new Pipeline().setStages(Array(col1_indexer,col2_indexer,
>>> col3_indexer,col4_indexer,col5_indexer,vectorAssembler,featureVecto
>>> rIndexer,decisionTree))
>>> val model = pipeline.*fit(trainingSet)*
>>> val output = model.transform(cvSet)
>>>
>>>
>>> Thanks,
>>> Baahu
>>>
>>> On Fri, Jul 8, 2016 at 11:24 PM, Bryan Cutler <cu...@gmail.com> wrote:
>>>
>>>> Hi Rich,
>>>>
>>>> I looked at the notebook and it seems like you are fitting the
>>>> StringIndexer and VectorIndexer to only the training data, and it should
>>>> the the entire data set.  So if the training data does not include all of
>>>> the labels and an unknown label appears in the test data during evaluation,
>>>> then it will not know how to index it.  So your code should be like this,
>>>> fit with 'digits' instead of 'training'
>>>>
>>>> val labelIndexer = new StringIndexer().setInputCol("l
>>>> abel").setOutputCol("indexedLabel").fit(digits)
>>>> // Automatically identify categorical features, and index them.
>>>> // Set maxCategories so features with > 4 distinct values are treated
>>>> as continuous.
>>>> val featureIndexer = new VectorIndexer().setInputCol("f
>>>> eatures").setOutputCol("indexedFeatures").setMaxCategories(4
>>>> ).fit(digits)
>>>>
>>>> Hope that helps!
>>>>
>>>> On Fri, Jul 1, 2016 at 9:24 AM, Rich Tarro <ri...@gmail.com> wrote:
>>>>
>>>>> Hi Bryan.
>>>>>
>>>>> Thanks for your continued help.
>>>>>
>>>>> Here is the code shown in a Jupyter notebook. I figured this was
>>>>> easier that cutting and pasting the code into an email. If you  would like
>>>>> me to send you the code in a different format let, me know. The necessary
>>>>> data is all downloaded within the notebook itself.
>>>>>
>>>>> https://console.ng.bluemix.net/data/notebooks/fe7e578a-401f-
>>>>> 4744-a318-b1b6bcf6f5f8/view?access_token=2f6df7b1dfcb3c1c2d9
>>>>> 4a794506bb282729dab8f05118fafe5f11886326e02fc
>>>>>
>>>>> A few additional pieces of information.
>>>>>
>>>>> 1. The training dataset is cached before training the model. If you do
>>>>> not cache the training dataset, the model will not train. The code
>>>>> model.transform(test) fails with a similar error. No other changes besides
>>>>> caching or not caching. Again, with the training dataset cached, the model
>>>>> can be successfully trained as seen in the notebook.
>>>>>
>>>>> 2. I have another version of the notebook where I download the same
>>>>> data in libsvm format rather than csv. That notebook works fine. All the
>>>>> code is essentially the same accounting for the difference in file formats.
>>>>>
>>>>> 3. I tested this same code on another Spark cloud platform and it
>>>>> displays the same symptoms when run there.
>>>>>
>>>>> Thanks.
>>>>> Rich
>>>>>
>>>>>
>>>>> On Wed, Jun 29, 2016 at 12:59 AM, Bryan Cutler <cu...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Are you fitting the VectorIndexer to the entire data set and not just
>>>>>> training or test data?  If you are able to post your code and some data to
>>>>>> reproduce, that would help in troubleshooting.
>>>>>>
>>>>>> On Tue, Jun 28, 2016 at 4:40 PM, Rich Tarro <ri...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Thanks for the response, but in my case I reversed the meaning of
>>>>>>> "prediction" and "predictedLabel". It seemed to make more sense to me that
>>>>>>> way, but in retrospect, it probably only causes confusion to anyone else
>>>>>>> looking at this. I reran the code with all the pipeline stage inputs and
>>>>>>> outputs named exactly as in the Random Forest Classifier example to make
>>>>>>> sure I hadn't messed anything up when I renamed things. Same error.
>>>>>>>
>>>>>>> I'm still at the point where I can train the model and make
>>>>>>> predictions, but not able to get the MulticlassClassificationEvaluator
>>>>>>> to work on the DataFrame of predictions.
>>>>>>>
>>>>>>> Any other suggestions? Thanks.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Jun 28, 2016 at 4:21 PM, Rich Tarro <ri...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I created a ML pipeline using the Random Forest Classifier -
>>>>>>>> similar to what is described here except in my case the source data is in
>>>>>>>> csv format rather than libsvm.
>>>>>>>>
>>>>>>>> https://spark.apache.org/docs/latest/ml-classification-regre
>>>>>>>> ssion.html#random-forest-classifier
>>>>>>>>
>>>>>>>> I am able to successfully train the model and make predictions (on
>>>>>>>> test data not used to train the model) as shown here.
>>>>>>>>
>>>>>>>> +------------+--------------+-----+----------+--------------------+
>>>>>>>> |indexedLabel|predictedLabel|label|prediction|            features|
>>>>>>>> +------------+--------------+-----+----------+--------------------+
>>>>>>>> |         4.0|           4.0|    0|         0|(784,[124,125,126...|
>>>>>>>> |         2.0|           2.0|    3|         3|(784,[119,120,121...|
>>>>>>>> |         8.0|           8.0|    8|         8|(784,[180,181,182...|
>>>>>>>> |         0.0|           0.0|    1|         1|(784,[154,155,156...|
>>>>>>>> |         3.0|           8.0|    2|         8|(784,[148,149,150...|
>>>>>>>> +------------+--------------+-----+----------+--------------------+
>>>>>>>> only showing top 5 rows
>>>>>>>>
>>>>>>>> However, when I attempt to calculate the error between the indexedLabel and the precictedLabel using the MulticlassClassificationEvaluator, I get the NoSuchElementException error attached below.
>>>>>>>>
>>>>>>>> val evaluator = new MulticlassClassificationEvaluator().setLabelCol("indexedLabel").setPredictionCol("predictedLabel").setMetricName("precision")
>>>>>>>> val accuracy = evaluator.evaluate(predictions)
>>>>>>>> println("Test Error = " + (1.0 - accuracy))
>>>>>>>>
>>>>>>>> What could be the issue?
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Name: org.apache.spark.SparkException
>>>>>>>> Message: Job aborted due to stage failure: Task 2 in stage 49.0 failed 10 times, most recent failure: Lost task 2.9 in stage 49.0 (TID 162, yp-spark-dal09-env5-0024): java.util.NoSuchElementException: key not found: 132.0
>>>>>>>> 	at scala.collection.MapLike$class.default(MapLike.scala:228)
>>>>>>>> 	at scala.collection.AbstractMap.default(Map.scala:58)
>>>>>>>> 	at scala.collection.MapLike$class.apply(MapLike.scala:141)
>>>>>>>> 	at scala.collection.AbstractMap.apply(Map.scala:58)
>>>>>>>> 	at org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$10.apply(VectorIndexer.scala:331)
>>>>>>>> 	at org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$10.apply(VectorIndexer.scala:309)
>>>>>>>> 	at org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$11.apply(VectorIndexer.scala:351)
>>>>>>>> 	at org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$11.apply(VectorIndexer.scala:351)
>>>>>>>> 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown Source)
>>>>>>>> 	at org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:67)
>>>>>>>> 	at org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:67)
>>>>>>>> 	at org.apache.spark.sql.execution.Filter$$anonfun$2$$anonfun$apply$2.apply(basicOperators.scala:74)
>>>>>>>> 	at org.apache.spark.sql.execution.Filter$$anonfun$2$$anonfun$apply$2.apply(basicOperators.scala:72)
>>>>>>>> 	at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390)
>>>>>>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>>> 	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:189)
>>>>>>>> 	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64)
>>>>>>>> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>>>>>>>> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>>>>>>> 	at org.apache.spark.scheduler.Task.run(Task.scala:89)
>>>>>>>> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>>>>>>> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153)
>>>>>>>> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>>>>>>>> 	at java.lang.Thread.run(Thread.java:785)
>>>>>>>>
>>>>>>>> Driver stacktrace:
>>>>>>>> StackTrace: org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
>>>>>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>>>>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>>>>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
>>>>>>>> scala.Option.foreach(Option.scala:236)
>>>>>>>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
>>>>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
>>>>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
>>>>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
>>>>>>>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>>>>>>> java.lang.Thread.getStackTrace(Thread.java:1117)
>>>>>>>> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
>>>>>>>> org.apache.spark.SparkContext.runJob(SparkContext.scala:1837)
>>>>>>>> org.apache.spark.SparkContext.runJob(SparkContext.scala:1850)
>>>>>>>> org.apache.spark.SparkContext.runJob(SparkContext.scala:1863)
>>>>>>>> org.apache.spark.SparkContext.runJob(SparkContext.scala:1934)
>>>>>>>> org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927)
>>>>>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
>>>>>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
>>>>>>>> org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
>>>>>>>> org.apache.spark.rdd.RDD.collect(RDD.scala:926)
>>>>>>>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$collectAsMap$1.apply(PairRDDFunctions.scala:741)
>>>>>>>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$collectAsMap$1.apply(PairRDDFunctions.scala:740)
>>>>>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
>>>>>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
>>>>>>>> org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
>>>>>>>> org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.scala:740)
>>>>>>>> org.apache.spark.mllib.evaluation.MulticlassMetrics.tpByClass$lzycompute(MulticlassMetrics.scala:49)
>>>>>>>> org.apache.spark.mllib.evaluation.MulticlassMetrics.tpByClass(MulticlassMetrics.scala:45)
>>>>>>>> org.apache.spark.mllib.evaluation.MulticlassMetrics.precision$lzycompute(MulticlassMetrics.scala:142)
>>>>>>>> org.apache.spark.mllib.evaluation.MulticlassMetrics.precision(MulticlassMetrics.scala:142)org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator.evaluate(MulticlassClassificationEvaluator.scala:84)
>>>>>>>> $line110.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:59)
>>>>>>>> $line110.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:64)
>>>>>>>> $line110.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:66)
>>>>>>>> $line110.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:68)
>>>>>>>> $line110.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:70)
>>>>>>>> $line110.$read$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:72)
>>>>>>>> $line110.$read$$iwC$$iwC$$iwC$$iwC.<init>(<console>:74)
>>>>>>>> $line110.$read$$iwC$$iwC$$iwC.<init>(<console>:76)
>>>>>>>> $line110.$read$$iwC$$iwC.<init>(<console>:78)
>>>>>>>> $line110.$read$$iwC.<init>(<console>:80)
>>>>>>>> $line110.$read.<init>(<console>:82)
>>>>>>>> $line110.$read$.<init>(<console>:86)
>>>>>>>> $line110.$read$.<clinit>(<console>)
>>>>>>>> $line110.$eval$.<init>(<console>:7)
>>>>>>>> $line110.$eval$.<clinit>(<console>)
>>>>>>>> $line110.$eval.$print(<console>)
>>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:95)
>>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:55)
>>>>>>>> java.lang.reflect.Method.invoke(Method.java:507)
>>>>>>>> org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
>>>>>>>> org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346)
>>>>>>>> org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
>>>>>>>> org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
>>>>>>>> org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
>>>>>>>> com.ibm.spark.interpreter.ScalaInterpreter$$anonfun$interpretAddTask$1$$anonfun$apply$3.apply(ScalaInterpreter.scala:296)
>>>>>>>> com.ibm.spark.interpreter.ScalaInterpreter$$anonfun$interpretAddTask$1$$anonfun$apply$3.apply(ScalaInterpreter.scala:291)
>>>>>>>> com.ibm.spark.global.StreamState$.withStreams(StreamState.scala:80)
>>>>>>>> com.ibm.spark.interpreter.ScalaInterpreter$$anonfun$interpretAddTask$1.apply(ScalaInterpreter.scala:290)
>>>>>>>> com.ibm.spark.interpreter.ScalaInterpreter$$anonfun$interpretAddTask$1.apply(ScalaInterpreter.scala:290)
>>>>>>>> com.ibm.spark.utils.TaskManager$$anonfun$add$2$$anon$1.run(TaskManager.scala:123)
>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153)
>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>>>>>>>> java.lang.Thread.run(Thread.java:785)
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>>
>>> --
>>> Twitter:http://twitter.com/Baahu
>>>
>>>
>>
>
>
> --
> Twitter:http://twitter.com/Baahu
>
>

Re: Random Forest Classification

Posted by Bahubali Jain <ba...@gmail.com>.
Hi Bryan,
Thanks for the reply.
I am indexing 5 columns ,then using these indexed columns to generate the
"feature" column thru vector assembler.
Which essentially means that I cannot use *fit()* directly on
"completeDataset" dataframe since it will neither have the "feature" column
and nor the 5 indexed columns.
Of-course there is a dirty way of doing this, but I am wondering if there
some optimized/intelligent approach for this.

Thanks,
Baahu

On Wed, Aug 31, 2016 at 3:30 AM, Bryan Cutler <cu...@gmail.com> wrote:

> You need to first fit just the VectorIndexer which returns the model, then
> add the model to the pipeline where it will only transform.
>
> val featureVectorIndexer = new VectorIndexer()
>     .setInputCol("feature")
>     .setOutputCol("indexedfeature")
>     .setMaxCategories(180)
>     .fit(completeDataset)
>
> On Tue, Aug 30, 2016 at 9:57 AM, Bahubali Jain <ba...@gmail.com> wrote:
>
>> Hi,
>> I had run into similar exception " java.util.NoSuchElementException: key
>> not found: " .
>> After further investigation I realized it is happening due to
>> vectorindexer being executed on training dataset and not on entire dataset.
>>
>> In the dataframe I have 5 categories , each of these have to go thru
>> stringindexer and then these are put thru a vector indexer to generate
>> feature vector.
>> What is the right way to do this, so that vector indexer can be run on
>> the entire data and not just on training data?
>>
>> Below is the current approach, as evident  VectorIndexer is being
>> generated based on the training set.
>>
>> Please Note: fit() on Vectorindexer cannot be called on entireset
>> dataframe since it doesn't have the required column(*feature *column is
>> being generated dynamically in pipeline execution)
>> How can the vectorindexer be *fit()* on the entireset?
>>
>>  val col1_indexer = new StringIndexer().setInputCol("c
>> ol1").setOutputCol("indexed_col1")
>> val col2_indexer = new StringIndexer().setInputCol("c
>> ol2").setOutputCol("indexed_col2")
>> val col3_indexer = new StringIndexer().setInputCol("c
>> ol3").setOutputCol("indexed_col3")
>> val col4_indexer = new StringIndexer().setInputCol("c
>> ol4").setOutputCol("indexed_col4")
>> val col5_indexer = new StringIndexer().setInputCol("c
>> ol5").setOutputCol("indexed_col5")
>>
>> val featureArray =  Array("indexed_col1","indexed_
>> col2","indexed_col3","indexed_col4","indexed_col5")
>> val vectorAssembler = new VectorAssembler().setInputCols
>> (featureArray).setOutputCol("*feature*")
>> val featureVectorIndexer = new VectorIndexer()
>>     .setInputCol("feature")
>>     .setOutputCol("indexedfeature")
>>     .setMaxCategories(180)
>>
>> val decisionTree = new DecisionTreeClassifier().setMa
>> xBins(300).setMaxDepth(1).setImpurity("entropy").setLabe
>> lCol("indexed_user_action").setFeaturesCol("indexedfeature
>> ").setPredictionCol("prediction")
>>
>> val pipeline = new Pipeline().setStages(Array(col1_indexer,col2_indexer,
>> col3_indexer,col4_indexer,col5_indexer,vectorAssembler,featureVecto
>> rIndexer,decisionTree))
>> val model = pipeline.*fit(trainingSet)*
>> val output = model.transform(cvSet)
>>
>>
>> Thanks,
>> Baahu
>>
>> On Fri, Jul 8, 2016 at 11:24 PM, Bryan Cutler <cu...@gmail.com> wrote:
>>
>>> Hi Rich,
>>>
>>> I looked at the notebook and it seems like you are fitting the
>>> StringIndexer and VectorIndexer to only the training data, and it should
>>> the the entire data set.  So if the training data does not include all of
>>> the labels and an unknown label appears in the test data during evaluation,
>>> then it will not know how to index it.  So your code should be like this,
>>> fit with 'digits' instead of 'training'
>>>
>>> val labelIndexer = new StringIndexer().setInputCol("l
>>> abel").setOutputCol("indexedLabel").fit(digits)
>>> // Automatically identify categorical features, and index them.
>>> // Set maxCategories so features with > 4 distinct values are treated as
>>> continuous.
>>> val featureIndexer = new VectorIndexer().setInputCol("f
>>> eatures").setOutputCol("indexedFeatures").setMaxCategories(4
>>> ).fit(digits)
>>>
>>> Hope that helps!
>>>
>>> On Fri, Jul 1, 2016 at 9:24 AM, Rich Tarro <ri...@gmail.com> wrote:
>>>
>>>> Hi Bryan.
>>>>
>>>> Thanks for your continued help.
>>>>
>>>> Here is the code shown in a Jupyter notebook. I figured this was easier
>>>> that cutting and pasting the code into an email. If you  would like me to
>>>> send you the code in a different format let, me know. The necessary data is
>>>> all downloaded within the notebook itself.
>>>>
>>>> https://console.ng.bluemix.net/data/notebooks/fe7e578a-401f-
>>>> 4744-a318-b1b6bcf6f5f8/view?access_token=2f6df7b1dfcb3c1c2d9
>>>> 4a794506bb282729dab8f05118fafe5f11886326e02fc
>>>>
>>>> A few additional pieces of information.
>>>>
>>>> 1. The training dataset is cached before training the model. If you do
>>>> not cache the training dataset, the model will not train. The code
>>>> model.transform(test) fails with a similar error. No other changes besides
>>>> caching or not caching. Again, with the training dataset cached, the model
>>>> can be successfully trained as seen in the notebook.
>>>>
>>>> 2. I have another version of the notebook where I download the same
>>>> data in libsvm format rather than csv. That notebook works fine. All the
>>>> code is essentially the same accounting for the difference in file formats.
>>>>
>>>> 3. I tested this same code on another Spark cloud platform and it
>>>> displays the same symptoms when run there.
>>>>
>>>> Thanks.
>>>> Rich
>>>>
>>>>
>>>> On Wed, Jun 29, 2016 at 12:59 AM, Bryan Cutler <cu...@gmail.com>
>>>> wrote:
>>>>
>>>>> Are you fitting the VectorIndexer to the entire data set and not just
>>>>> training or test data?  If you are able to post your code and some data to
>>>>> reproduce, that would help in troubleshooting.
>>>>>
>>>>> On Tue, Jun 28, 2016 at 4:40 PM, Rich Tarro <ri...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Thanks for the response, but in my case I reversed the meaning of
>>>>>> "prediction" and "predictedLabel". It seemed to make more sense to me that
>>>>>> way, but in retrospect, it probably only causes confusion to anyone else
>>>>>> looking at this. I reran the code with all the pipeline stage inputs and
>>>>>> outputs named exactly as in the Random Forest Classifier example to make
>>>>>> sure I hadn't messed anything up when I renamed things. Same error.
>>>>>>
>>>>>> I'm still at the point where I can train the model and make
>>>>>> predictions, but not able to get the MulticlassClassificationEvaluator
>>>>>> to work on the DataFrame of predictions.
>>>>>>
>>>>>> Any other suggestions? Thanks.
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Jun 28, 2016 at 4:21 PM, Rich Tarro <ri...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I created a ML pipeline using the Random Forest Classifier - similar
>>>>>>> to what is described here except in my case the source data is in csv
>>>>>>> format rather than libsvm.
>>>>>>>
>>>>>>> https://spark.apache.org/docs/latest/ml-classification-regre
>>>>>>> ssion.html#random-forest-classifier
>>>>>>>
>>>>>>> I am able to successfully train the model and make predictions (on
>>>>>>> test data not used to train the model) as shown here.
>>>>>>>
>>>>>>> +------------+--------------+-----+----------+--------------------+
>>>>>>> |indexedLabel|predictedLabel|label|prediction|            features|
>>>>>>> +------------+--------------+-----+----------+--------------------+
>>>>>>> |         4.0|           4.0|    0|         0|(784,[124,125,126...|
>>>>>>> |         2.0|           2.0|    3|         3|(784,[119,120,121...|
>>>>>>> |         8.0|           8.0|    8|         8|(784,[180,181,182...|
>>>>>>> |         0.0|           0.0|    1|         1|(784,[154,155,156...|
>>>>>>> |         3.0|           8.0|    2|         8|(784,[148,149,150...|
>>>>>>> +------------+--------------+-----+----------+--------------------+
>>>>>>> only showing top 5 rows
>>>>>>>
>>>>>>> However, when I attempt to calculate the error between the indexedLabel and the precictedLabel using the MulticlassClassificationEvaluator, I get the NoSuchElementException error attached below.
>>>>>>>
>>>>>>> val evaluator = new MulticlassClassificationEvaluator().setLabelCol("indexedLabel").setPredictionCol("predictedLabel").setMetricName("precision")
>>>>>>> val accuracy = evaluator.evaluate(predictions)
>>>>>>> println("Test Error = " + (1.0 - accuracy))
>>>>>>>
>>>>>>> What could be the issue?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Name: org.apache.spark.SparkException
>>>>>>> Message: Job aborted due to stage failure: Task 2 in stage 49.0 failed 10 times, most recent failure: Lost task 2.9 in stage 49.0 (TID 162, yp-spark-dal09-env5-0024): java.util.NoSuchElementException: key not found: 132.0
>>>>>>> 	at scala.collection.MapLike$class.default(MapLike.scala:228)
>>>>>>> 	at scala.collection.AbstractMap.default(Map.scala:58)
>>>>>>> 	at scala.collection.MapLike$class.apply(MapLike.scala:141)
>>>>>>> 	at scala.collection.AbstractMap.apply(Map.scala:58)
>>>>>>> 	at org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$10.apply(VectorIndexer.scala:331)
>>>>>>> 	at org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$10.apply(VectorIndexer.scala:309)
>>>>>>> 	at org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$11.apply(VectorIndexer.scala:351)
>>>>>>> 	at org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$11.apply(VectorIndexer.scala:351)
>>>>>>> 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown Source)
>>>>>>> 	at org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:67)
>>>>>>> 	at org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:67)
>>>>>>> 	at org.apache.spark.sql.execution.Filter$$anonfun$2$$anonfun$apply$2.apply(basicOperators.scala:74)
>>>>>>> 	at org.apache.spark.sql.execution.Filter$$anonfun$2$$anonfun$apply$2.apply(basicOperators.scala:72)
>>>>>>> 	at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390)
>>>>>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>> 	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:189)
>>>>>>> 	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64)
>>>>>>> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>>>>>>> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>>>>>> 	at org.apache.spark.scheduler.Task.run(Task.scala:89)
>>>>>>> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>>>>>> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153)
>>>>>>> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>>>>>>> 	at java.lang.Thread.run(Thread.java:785)
>>>>>>>
>>>>>>> Driver stacktrace:
>>>>>>> StackTrace: org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
>>>>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>>>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>>>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
>>>>>>> scala.Option.foreach(Option.scala:236)
>>>>>>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
>>>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
>>>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
>>>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
>>>>>>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>>>>>> java.lang.Thread.getStackTrace(Thread.java:1117)
>>>>>>> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
>>>>>>> org.apache.spark.SparkContext.runJob(SparkContext.scala:1837)
>>>>>>> org.apache.spark.SparkContext.runJob(SparkContext.scala:1850)
>>>>>>> org.apache.spark.SparkContext.runJob(SparkContext.scala:1863)
>>>>>>> org.apache.spark.SparkContext.runJob(SparkContext.scala:1934)
>>>>>>> org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927)
>>>>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
>>>>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
>>>>>>> org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
>>>>>>> org.apache.spark.rdd.RDD.collect(RDD.scala:926)
>>>>>>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$collectAsMap$1.apply(PairRDDFunctions.scala:741)
>>>>>>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$collectAsMap$1.apply(PairRDDFunctions.scala:740)
>>>>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
>>>>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
>>>>>>> org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
>>>>>>> org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.scala:740)
>>>>>>> org.apache.spark.mllib.evaluation.MulticlassMetrics.tpByClass$lzycompute(MulticlassMetrics.scala:49)
>>>>>>> org.apache.spark.mllib.evaluation.MulticlassMetrics.tpByClass(MulticlassMetrics.scala:45)
>>>>>>> org.apache.spark.mllib.evaluation.MulticlassMetrics.precision$lzycompute(MulticlassMetrics.scala:142)
>>>>>>> org.apache.spark.mllib.evaluation.MulticlassMetrics.precision(MulticlassMetrics.scala:142)org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator.evaluate(MulticlassClassificationEvaluator.scala:84)
>>>>>>> $line110.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:59)
>>>>>>> $line110.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:64)
>>>>>>> $line110.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:66)
>>>>>>> $line110.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:68)
>>>>>>> $line110.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:70)
>>>>>>> $line110.$read$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:72)
>>>>>>> $line110.$read$$iwC$$iwC$$iwC$$iwC.<init>(<console>:74)
>>>>>>> $line110.$read$$iwC$$iwC$$iwC.<init>(<console>:76)
>>>>>>> $line110.$read$$iwC$$iwC.<init>(<console>:78)
>>>>>>> $line110.$read$$iwC.<init>(<console>:80)
>>>>>>> $line110.$read.<init>(<console>:82)
>>>>>>> $line110.$read$.<init>(<console>:86)
>>>>>>> $line110.$read$.<clinit>(<console>)
>>>>>>> $line110.$eval$.<init>(<console>:7)
>>>>>>> $line110.$eval$.<clinit>(<console>)
>>>>>>> $line110.$eval.$print(<console>)
>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:95)
>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:55)
>>>>>>> java.lang.reflect.Method.invoke(Method.java:507)
>>>>>>> org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
>>>>>>> org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346)
>>>>>>> org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
>>>>>>> org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
>>>>>>> org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
>>>>>>> com.ibm.spark.interpreter.ScalaInterpreter$$anonfun$interpretAddTask$1$$anonfun$apply$3.apply(ScalaInterpreter.scala:296)
>>>>>>> com.ibm.spark.interpreter.ScalaInterpreter$$anonfun$interpretAddTask$1$$anonfun$apply$3.apply(ScalaInterpreter.scala:291)
>>>>>>> com.ibm.spark.global.StreamState$.withStreams(StreamState.scala:80)
>>>>>>> com.ibm.spark.interpreter.ScalaInterpreter$$anonfun$interpretAddTask$1.apply(ScalaInterpreter.scala:290)
>>>>>>> com.ibm.spark.interpreter.ScalaInterpreter$$anonfun$interpretAddTask$1.apply(ScalaInterpreter.scala:290)
>>>>>>> com.ibm.spark.utils.TaskManager$$anonfun$add$2$$anon$1.run(TaskManager.scala:123)
>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153)
>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>>>>>>> java.lang.Thread.run(Thread.java:785)
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>>
>> --
>> Twitter:http://twitter.com/Baahu
>>
>>
>


-- 
Twitter:http://twitter.com/Baahu

Re: Random Forest Classification

Posted by Bryan Cutler <cu...@gmail.com>.
You need to first fit just the VectorIndexer which returns the model, then
add the model to the pipeline where it will only transform.

val featureVectorIndexer = new VectorIndexer()
    .setInputCol("feature")
    .setOutputCol("indexedfeature")
    .setMaxCategories(180)
    .fit(completeDataset)

On Tue, Aug 30, 2016 at 9:57 AM, Bahubali Jain <ba...@gmail.com> wrote:

> Hi,
> I had run into similar exception " java.util.NoSuchElementException: key
> not found: " .
> After further investigation I realized it is happening due to
> vectorindexer being executed on training dataset and not on entire dataset.
>
> In the dataframe I have 5 categories , each of these have to go thru
> stringindexer and then these are put thru a vector indexer to generate
> feature vector.
> What is the right way to do this, so that vector indexer can be run on the
> entire data and not just on training data?
>
> Below is the current approach, as evident  VectorIndexer is being
> generated based on the training set.
>
> Please Note: fit() on Vectorindexer cannot be called on entireset
> dataframe since it doesn't have the required column(*feature *column is
> being generated dynamically in pipeline execution)
> How can the vectorindexer be *fit()* on the entireset?
>
>  val col1_indexer = new StringIndexer().setInputCol("
> col1").setOutputCol("indexed_col1")
> val col2_indexer = new StringIndexer().setInputCol("
> col2").setOutputCol("indexed_col2")
> val col3_indexer = new StringIndexer().setInputCol("
> col3").setOutputCol("indexed_col3")
> val col4_indexer = new StringIndexer().setInputCol("
> col4").setOutputCol("indexed_col4")
> val col5_indexer = new StringIndexer().setInputCol("
> col5").setOutputCol("indexed_col5")
>
> val featureArray =  Array("indexed_col1","indexed_
> col2","indexed_col3","indexed_col4","indexed_col5")
> val vectorAssembler = new VectorAssembler().setInputCols(featureArray).
> setOutputCol("*feature*")
> val featureVectorIndexer = new VectorIndexer()
>     .setInputCol("feature")
>     .setOutputCol("indexedfeature")
>     .setMaxCategories(180)
>
> val decisionTree = new DecisionTreeClassifier().
> setMaxBins(300).setMaxDepth(1).setImpurity("entropy").
> setLabelCol("indexed_user_action").setFeaturesCol("indexedfeature").
> setPredictionCol("prediction")
>
> val pipeline = new Pipeline().setStages(Array(col1_indexer,col2_indexer,
> col3_indexer,col4_indexer,col5_indexer,vectorAssembler,
> featureVectorIndexer,decisionTree))
> val model = pipeline.*fit(trainingSet)*
> val output = model.transform(cvSet)
>
>
> Thanks,
> Baahu
>
> On Fri, Jul 8, 2016 at 11:24 PM, Bryan Cutler <cu...@gmail.com> wrote:
>
>> Hi Rich,
>>
>> I looked at the notebook and it seems like you are fitting the
>> StringIndexer and VectorIndexer to only the training data, and it should
>> the the entire data set.  So if the training data does not include all of
>> the labels and an unknown label appears in the test data during evaluation,
>> then it will not know how to index it.  So your code should be like this,
>> fit with 'digits' instead of 'training'
>>
>> val labelIndexer = new StringIndexer().setInputCol("l
>> abel").setOutputCol("indexedLabel").fit(digits)
>> // Automatically identify categorical features, and index them.
>> // Set maxCategories so features with > 4 distinct values are treated as
>> continuous.
>> val featureIndexer = new VectorIndexer().setInputCol("f
>> eatures").setOutputCol("indexedFeatures").setMaxCategories(4).fit(digits)
>>
>> Hope that helps!
>>
>> On Fri, Jul 1, 2016 at 9:24 AM, Rich Tarro <ri...@gmail.com> wrote:
>>
>>> Hi Bryan.
>>>
>>> Thanks for your continued help.
>>>
>>> Here is the code shown in a Jupyter notebook. I figured this was easier
>>> that cutting and pasting the code into an email. If you  would like me to
>>> send you the code in a different format let, me know. The necessary data is
>>> all downloaded within the notebook itself.
>>>
>>> https://console.ng.bluemix.net/data/notebooks/fe7e578a-401f-
>>> 4744-a318-b1b6bcf6f5f8/view?access_token=2f6df7b1dfcb3c1c2
>>> d94a794506bb282729dab8f05118fafe5f11886326e02fc
>>>
>>> A few additional pieces of information.
>>>
>>> 1. The training dataset is cached before training the model. If you do
>>> not cache the training dataset, the model will not train. The code
>>> model.transform(test) fails with a similar error. No other changes besides
>>> caching or not caching. Again, with the training dataset cached, the model
>>> can be successfully trained as seen in the notebook.
>>>
>>> 2. I have another version of the notebook where I download the same data
>>> in libsvm format rather than csv. That notebook works fine. All the code is
>>> essentially the same accounting for the difference in file formats.
>>>
>>> 3. I tested this same code on another Spark cloud platform and it
>>> displays the same symptoms when run there.
>>>
>>> Thanks.
>>> Rich
>>>
>>>
>>> On Wed, Jun 29, 2016 at 12:59 AM, Bryan Cutler <cu...@gmail.com>
>>> wrote:
>>>
>>>> Are you fitting the VectorIndexer to the entire data set and not just
>>>> training or test data?  If you are able to post your code and some data to
>>>> reproduce, that would help in troubleshooting.
>>>>
>>>> On Tue, Jun 28, 2016 at 4:40 PM, Rich Tarro <ri...@gmail.com>
>>>> wrote:
>>>>
>>>>> Thanks for the response, but in my case I reversed the meaning of
>>>>> "prediction" and "predictedLabel". It seemed to make more sense to me that
>>>>> way, but in retrospect, it probably only causes confusion to anyone else
>>>>> looking at this. I reran the code with all the pipeline stage inputs and
>>>>> outputs named exactly as in the Random Forest Classifier example to make
>>>>> sure I hadn't messed anything up when I renamed things. Same error.
>>>>>
>>>>> I'm still at the point where I can train the model and make
>>>>> predictions, but not able to get the MulticlassClassificationEvaluator
>>>>> to work on the DataFrame of predictions.
>>>>>
>>>>> Any other suggestions? Thanks.
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Jun 28, 2016 at 4:21 PM, Rich Tarro <ri...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> I created a ML pipeline using the Random Forest Classifier - similar
>>>>>> to what is described here except in my case the source data is in csv
>>>>>> format rather than libsvm.
>>>>>>
>>>>>> https://spark.apache.org/docs/latest/ml-classification-regre
>>>>>> ssion.html#random-forest-classifier
>>>>>>
>>>>>> I am able to successfully train the model and make predictions (on
>>>>>> test data not used to train the model) as shown here.
>>>>>>
>>>>>> +------------+--------------+-----+----------+--------------------+
>>>>>> |indexedLabel|predictedLabel|label|prediction|            features|
>>>>>> +------------+--------------+-----+----------+--------------------+
>>>>>> |         4.0|           4.0|    0|         0|(784,[124,125,126...|
>>>>>> |         2.0|           2.0|    3|         3|(784,[119,120,121...|
>>>>>> |         8.0|           8.0|    8|         8|(784,[180,181,182...|
>>>>>> |         0.0|           0.0|    1|         1|(784,[154,155,156...|
>>>>>> |         3.0|           8.0|    2|         8|(784,[148,149,150...|
>>>>>> +------------+--------------+-----+----------+--------------------+
>>>>>> only showing top 5 rows
>>>>>>
>>>>>> However, when I attempt to calculate the error between the indexedLabel and the precictedLabel using the MulticlassClassificationEvaluator, I get the NoSuchElementException error attached below.
>>>>>>
>>>>>> val evaluator = new MulticlassClassificationEvaluator().setLabelCol("indexedLabel").setPredictionCol("predictedLabel").setMetricName("precision")
>>>>>> val accuracy = evaluator.evaluate(predictions)
>>>>>> println("Test Error = " + (1.0 - accuracy))
>>>>>>
>>>>>> What could be the issue?
>>>>>>
>>>>>>
>>>>>>
>>>>>> Name: org.apache.spark.SparkException
>>>>>> Message: Job aborted due to stage failure: Task 2 in stage 49.0 failed 10 times, most recent failure: Lost task 2.9 in stage 49.0 (TID 162, yp-spark-dal09-env5-0024): java.util.NoSuchElementException: key not found: 132.0
>>>>>> 	at scala.collection.MapLike$class.default(MapLike.scala:228)
>>>>>> 	at scala.collection.AbstractMap.default(Map.scala:58)
>>>>>> 	at scala.collection.MapLike$class.apply(MapLike.scala:141)
>>>>>> 	at scala.collection.AbstractMap.apply(Map.scala:58)
>>>>>> 	at org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$10.apply(VectorIndexer.scala:331)
>>>>>> 	at org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$10.apply(VectorIndexer.scala:309)
>>>>>> 	at org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$11.apply(VectorIndexer.scala:351)
>>>>>> 	at org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$11.apply(VectorIndexer.scala:351)
>>>>>> 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown Source)
>>>>>> 	at org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:67)
>>>>>> 	at org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:67)
>>>>>> 	at org.apache.spark.sql.execution.Filter$$anonfun$2$$anonfun$apply$2.apply(basicOperators.scala:74)
>>>>>> 	at org.apache.spark.sql.execution.Filter$$anonfun$2$$anonfun$apply$2.apply(basicOperators.scala:72)
>>>>>> 	at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390)
>>>>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>> 	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:189)
>>>>>> 	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64)
>>>>>> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>>>>>> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>>>>> 	at org.apache.spark.scheduler.Task.run(Task.scala:89)
>>>>>> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>>>>> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153)
>>>>>> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>>>>>> 	at java.lang.Thread.run(Thread.java:785)
>>>>>>
>>>>>> Driver stacktrace:
>>>>>> StackTrace: org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
>>>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
>>>>>> scala.Option.foreach(Option.scala:236)
>>>>>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
>>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
>>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
>>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
>>>>>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>>>>> java.lang.Thread.getStackTrace(Thread.java:1117)
>>>>>> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
>>>>>> org.apache.spark.SparkContext.runJob(SparkContext.scala:1837)
>>>>>> org.apache.spark.SparkContext.runJob(SparkContext.scala:1850)
>>>>>> org.apache.spark.SparkContext.runJob(SparkContext.scala:1863)
>>>>>> org.apache.spark.SparkContext.runJob(SparkContext.scala:1934)
>>>>>> org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927)
>>>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
>>>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
>>>>>> org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
>>>>>> org.apache.spark.rdd.RDD.collect(RDD.scala:926)
>>>>>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$collectAsMap$1.apply(PairRDDFunctions.scala:741)
>>>>>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$collectAsMap$1.apply(PairRDDFunctions.scala:740)
>>>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
>>>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
>>>>>> org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
>>>>>> org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.scala:740)
>>>>>> org.apache.spark.mllib.evaluation.MulticlassMetrics.tpByClass$lzycompute(MulticlassMetrics.scala:49)
>>>>>> org.apache.spark.mllib.evaluation.MulticlassMetrics.tpByClass(MulticlassMetrics.scala:45)
>>>>>> org.apache.spark.mllib.evaluation.MulticlassMetrics.precision$lzycompute(MulticlassMetrics.scala:142)
>>>>>> org.apache.spark.mllib.evaluation.MulticlassMetrics.precision(MulticlassMetrics.scala:142)org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator.evaluate(MulticlassClassificationEvaluator.scala:84)
>>>>>> $line110.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:59)
>>>>>> $line110.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:64)
>>>>>> $line110.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:66)
>>>>>> $line110.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:68)
>>>>>> $line110.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:70)
>>>>>> $line110.$read$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:72)
>>>>>> $line110.$read$$iwC$$iwC$$iwC$$iwC.<init>(<console>:74)
>>>>>> $line110.$read$$iwC$$iwC$$iwC.<init>(<console>:76)
>>>>>> $line110.$read$$iwC$$iwC.<init>(<console>:78)
>>>>>> $line110.$read$$iwC.<init>(<console>:80)
>>>>>> $line110.$read.<init>(<console>:82)
>>>>>> $line110.$read$.<init>(<console>:86)
>>>>>> $line110.$read$.<clinit>(<console>)
>>>>>> $line110.$eval$.<init>(<console>:7)
>>>>>> $line110.$eval$.<clinit>(<console>)
>>>>>> $line110.$eval.$print(<console>)
>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:95)
>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:55)
>>>>>> java.lang.reflect.Method.invoke(Method.java:507)
>>>>>> org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
>>>>>> org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346)
>>>>>> org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
>>>>>> org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
>>>>>> org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
>>>>>> com.ibm.spark.interpreter.ScalaInterpreter$$anonfun$interpretAddTask$1$$anonfun$apply$3.apply(ScalaInterpreter.scala:296)
>>>>>> com.ibm.spark.interpreter.ScalaInterpreter$$anonfun$interpretAddTask$1$$anonfun$apply$3.apply(ScalaInterpreter.scala:291)
>>>>>> com.ibm.spark.global.StreamState$.withStreams(StreamState.scala:80)
>>>>>> com.ibm.spark.interpreter.ScalaInterpreter$$anonfun$interpretAddTask$1.apply(ScalaInterpreter.scala:290)
>>>>>> com.ibm.spark.interpreter.ScalaInterpreter$$anonfun$interpretAddTask$1.apply(ScalaInterpreter.scala:290)
>>>>>> com.ibm.spark.utils.TaskManager$$anonfun$add$2$$anon$1.run(TaskManager.scala:123)
>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153)
>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>>>>>> java.lang.Thread.run(Thread.java:785)
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>
>
> --
> Twitter:http://twitter.com/Baahu
>
>

Re: Random Forest Classification

Posted by Bahubali Jain <ba...@gmail.com>.
Hi,
I had run into similar exception " java.util.NoSuchElementException: key
not found: " .
After further investigation I realized it is happening due to vectorindexer
being executed on training dataset and not on entire dataset.

In the dataframe I have 5 categories , each of these have to go thru
stringindexer and then these are put thru a vector indexer to generate
feature vector.
What is the right way to do this, so that vector indexer can be run on the
entire data and not just on training data?

Below is the current approach, as evident  VectorIndexer is being generated
based on the training set.

Please Note: fit() on Vectorindexer cannot be called on entireset
dataframe since it doesn't have the required column(*feature *column is
being generated dynamically in pipeline execution)
How can the vectorindexer be *fit()* on the entireset?

 val col1_indexer = new
StringIndexer().setInputCol("col1").setOutputCol("indexed_col1")
val col2_indexer = new
StringIndexer().setInputCol("col2").setOutputCol("indexed_col2")
val col3_indexer = new
StringIndexer().setInputCol("col3").setOutputCol("indexed_col3")
val col4_indexer = new
StringIndexer().setInputCol("col4").setOutputCol("indexed_col4")
val col5_indexer = new
StringIndexer().setInputCol("col5").setOutputCol("indexed_col5")

val featureArray =
Array("indexed_col1","indexed_col2","indexed_col3","indexed_col4","indexed_col5")
val vectorAssembler = new
VectorAssembler().setInputCols(featureArray).setOutputCol("*feature*")
val featureVectorIndexer = new VectorIndexer()
    .setInputCol("feature")
    .setOutputCol("indexedfeature")
    .setMaxCategories(180)

val decisionTree = new
DecisionTreeClassifier().setMaxBins(300).setMaxDepth(1).setImpurity("entropy").setLabelCol("indexed_user_action").setFeaturesCol("indexedfeature").setPredictionCol("prediction")


val pipeline = new Pipeline().setStages(Array(col1_indexer,col2_indexer,
col3_indexer,col4_indexer,col5_indexer,
vectorAssembler,featureVectorIndexer,decisionTree))
val model = pipeline.*fit(trainingSet)*
val output = model.transform(cvSet)


Thanks,
Baahu

On Fri, Jul 8, 2016 at 11:24 PM, Bryan Cutler <cu...@gmail.com> wrote:

> Hi Rich,
>
> I looked at the notebook and it seems like you are fitting the
> StringIndexer and VectorIndexer to only the training data, and it should
> the the entire data set.  So if the training data does not include all of
> the labels and an unknown label appears in the test data during evaluation,
> then it will not know how to index it.  So your code should be like this,
> fit with 'digits' instead of 'training'
>
> val labelIndexer = new StringIndexer().setInputCol("label").setOutputCol("
> indexedLabel").fit(digits)
> // Automatically identify categorical features, and index them.
> // Set maxCategories so features with > 4 distinct values are treated as
> continuous.
> val featureIndexer = new VectorIndexer().setInputCol("
> features").setOutputCol("indexedFeatures").setMaxCategories(4).fit(digits)
>
> Hope that helps!
>
> On Fri, Jul 1, 2016 at 9:24 AM, Rich Tarro <ri...@gmail.com> wrote:
>
>> Hi Bryan.
>>
>> Thanks for your continued help.
>>
>> Here is the code shown in a Jupyter notebook. I figured this was easier
>> that cutting and pasting the code into an email. If you  would like me to
>> send you the code in a different format let, me know. The necessary data is
>> all downloaded within the notebook itself.
>>
>> https://console.ng.bluemix.net/data/notebooks/fe7e578a-
>> 401f-4744-a318-b1b6bcf6f5f8/view?access_token=
>> 2f6df7b1dfcb3c1c2d94a794506bb282729dab8f05118fafe5f11886326e02fc
>>
>> A few additional pieces of information.
>>
>> 1. The training dataset is cached before training the model. If you do
>> not cache the training dataset, the model will not train. The code
>> model.transform(test) fails with a similar error. No other changes besides
>> caching or not caching. Again, with the training dataset cached, the model
>> can be successfully trained as seen in the notebook.
>>
>> 2. I have another version of the notebook where I download the same data
>> in libsvm format rather than csv. That notebook works fine. All the code is
>> essentially the same accounting for the difference in file formats.
>>
>> 3. I tested this same code on another Spark cloud platform and it
>> displays the same symptoms when run there.
>>
>> Thanks.
>> Rich
>>
>>
>> On Wed, Jun 29, 2016 at 12:59 AM, Bryan Cutler <cu...@gmail.com> wrote:
>>
>>> Are you fitting the VectorIndexer to the entire data set and not just
>>> training or test data?  If you are able to post your code and some data to
>>> reproduce, that would help in troubleshooting.
>>>
>>> On Tue, Jun 28, 2016 at 4:40 PM, Rich Tarro <ri...@gmail.com> wrote:
>>>
>>>> Thanks for the response, but in my case I reversed the meaning of
>>>> "prediction" and "predictedLabel". It seemed to make more sense to me that
>>>> way, but in retrospect, it probably only causes confusion to anyone else
>>>> looking at this. I reran the code with all the pipeline stage inputs and
>>>> outputs named exactly as in the Random Forest Classifier example to make
>>>> sure I hadn't messed anything up when I renamed things. Same error.
>>>>
>>>> I'm still at the point where I can train the model and make
>>>> predictions, but not able to get the MulticlassClassificationEvaluator
>>>> to work on the DataFrame of predictions.
>>>>
>>>> Any other suggestions? Thanks.
>>>>
>>>>
>>>>
>>>> On Tue, Jun 28, 2016 at 4:21 PM, Rich Tarro <ri...@gmail.com>
>>>> wrote:
>>>>
>>>>> I created a ML pipeline using the Random Forest Classifier - similar
>>>>> to what is described here except in my case the source data is in csv
>>>>> format rather than libsvm.
>>>>>
>>>>> https://spark.apache.org/docs/latest/ml-classification-
>>>>> regression.html#random-forest-classifier
>>>>>
>>>>> I am able to successfully train the model and make predictions (on
>>>>> test data not used to train the model) as shown here.
>>>>>
>>>>> +------------+--------------+-----+----------+--------------------+
>>>>> |indexedLabel|predictedLabel|label|prediction|            features|
>>>>> +------------+--------------+-----+----------+--------------------+
>>>>> |         4.0|           4.0|    0|         0|(784,[124,125,126...|
>>>>> |         2.0|           2.0|    3|         3|(784,[119,120,121...|
>>>>> |         8.0|           8.0|    8|         8|(784,[180,181,182...|
>>>>> |         0.0|           0.0|    1|         1|(784,[154,155,156...|
>>>>> |         3.0|           8.0|    2|         8|(784,[148,149,150...|
>>>>> +------------+--------------+-----+----------+--------------------+
>>>>> only showing top 5 rows
>>>>>
>>>>> However, when I attempt to calculate the error between the indexedLabel and the precictedLabel using the MulticlassClassificationEvaluator, I get the NoSuchElementException error attached below.
>>>>>
>>>>> val evaluator = new MulticlassClassificationEvaluator().setLabelCol("indexedLabel").setPredictionCol("predictedLabel").setMetricName("precision")
>>>>> val accuracy = evaluator.evaluate(predictions)
>>>>> println("Test Error = " + (1.0 - accuracy))
>>>>>
>>>>> What could be the issue?
>>>>>
>>>>>
>>>>>
>>>>> Name: org.apache.spark.SparkException
>>>>> Message: Job aborted due to stage failure: Task 2 in stage 49.0 failed 10 times, most recent failure: Lost task 2.9 in stage 49.0 (TID 162, yp-spark-dal09-env5-0024): java.util.NoSuchElementException: key not found: 132.0
>>>>> 	at scala.collection.MapLike$class.default(MapLike.scala:228)
>>>>> 	at scala.collection.AbstractMap.default(Map.scala:58)
>>>>> 	at scala.collection.MapLike$class.apply(MapLike.scala:141)
>>>>> 	at scala.collection.AbstractMap.apply(Map.scala:58)
>>>>> 	at org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$10.apply(VectorIndexer.scala:331)
>>>>> 	at org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$10.apply(VectorIndexer.scala:309)
>>>>> 	at org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$11.apply(VectorIndexer.scala:351)
>>>>> 	at org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$11.apply(VectorIndexer.scala:351)
>>>>> 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown Source)
>>>>> 	at org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:67)
>>>>> 	at org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:67)
>>>>> 	at org.apache.spark.sql.execution.Filter$$anonfun$2$$anonfun$apply$2.apply(basicOperators.scala:74)
>>>>> 	at org.apache.spark.sql.execution.Filter$$anonfun$2$$anonfun$apply$2.apply(basicOperators.scala:72)
>>>>> 	at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390)
>>>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>> 	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:189)
>>>>> 	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64)
>>>>> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>>>>> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>>>> 	at org.apache.spark.scheduler.Task.run(Task.scala:89)
>>>>> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>>>> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153)
>>>>> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>>>>> 	at java.lang.Thread.run(Thread.java:785)
>>>>>
>>>>> Driver stacktrace:
>>>>> StackTrace: org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
>>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
>>>>> scala.Option.foreach(Option.scala:236)
>>>>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
>>>>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>>>> java.lang.Thread.getStackTrace(Thread.java:1117)
>>>>> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
>>>>> org.apache.spark.SparkContext.runJob(SparkContext.scala:1837)
>>>>> org.apache.spark.SparkContext.runJob(SparkContext.scala:1850)
>>>>> org.apache.spark.SparkContext.runJob(SparkContext.scala:1863)
>>>>> org.apache.spark.SparkContext.runJob(SparkContext.scala:1934)
>>>>> org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927)
>>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
>>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
>>>>> org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
>>>>> org.apache.spark.rdd.RDD.collect(RDD.scala:926)
>>>>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$collectAsMap$1.apply(PairRDDFunctions.scala:741)
>>>>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$collectAsMap$1.apply(PairRDDFunctions.scala:740)
>>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
>>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
>>>>> org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
>>>>> org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.scala:740)
>>>>> org.apache.spark.mllib.evaluation.MulticlassMetrics.tpByClass$lzycompute(MulticlassMetrics.scala:49)
>>>>> org.apache.spark.mllib.evaluation.MulticlassMetrics.tpByClass(MulticlassMetrics.scala:45)
>>>>> org.apache.spark.mllib.evaluation.MulticlassMetrics.precision$lzycompute(MulticlassMetrics.scala:142)
>>>>> org.apache.spark.mllib.evaluation.MulticlassMetrics.precision(MulticlassMetrics.scala:142)org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator.evaluate(MulticlassClassificationEvaluator.scala:84)
>>>>> $line110.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:59)
>>>>> $line110.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:64)
>>>>> $line110.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:66)
>>>>> $line110.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:68)
>>>>> $line110.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:70)
>>>>> $line110.$read$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:72)
>>>>> $line110.$read$$iwC$$iwC$$iwC$$iwC.<init>(<console>:74)
>>>>> $line110.$read$$iwC$$iwC$$iwC.<init>(<console>:76)
>>>>> $line110.$read$$iwC$$iwC.<init>(<console>:78)
>>>>> $line110.$read$$iwC.<init>(<console>:80)
>>>>> $line110.$read.<init>(<console>:82)
>>>>> $line110.$read$.<init>(<console>:86)
>>>>> $line110.$read$.<clinit>(<console>)
>>>>> $line110.$eval$.<init>(<console>:7)
>>>>> $line110.$eval$.<clinit>(<console>)
>>>>> $line110.$eval.$print(<console>)
>>>>> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:95)
>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:55)
>>>>> java.lang.reflect.Method.invoke(Method.java:507)
>>>>> org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
>>>>> org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346)
>>>>> org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
>>>>> org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
>>>>> org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
>>>>> com.ibm.spark.interpreter.ScalaInterpreter$$anonfun$interpretAddTask$1$$anonfun$apply$3.apply(ScalaInterpreter.scala:296)
>>>>> com.ibm.spark.interpreter.ScalaInterpreter$$anonfun$interpretAddTask$1$$anonfun$apply$3.apply(ScalaInterpreter.scala:291)
>>>>> com.ibm.spark.global.StreamState$.withStreams(StreamState.scala:80)
>>>>> com.ibm.spark.interpreter.ScalaInterpreter$$anonfun$interpretAddTask$1.apply(ScalaInterpreter.scala:290)
>>>>> com.ibm.spark.interpreter.ScalaInterpreter$$anonfun$interpretAddTask$1.apply(ScalaInterpreter.scala:290)
>>>>> com.ibm.spark.utils.TaskManager$$anonfun$add$2$$anon$1.run(TaskManager.scala:123)
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153)
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>>>>> java.lang.Thread.run(Thread.java:785)
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>


-- 
Twitter:http://twitter.com/Baahu

Re: Random Forest Classification

Posted by Bryan Cutler <cu...@gmail.com>.
Hi Rich,

I looked at the notebook and it seems like you are fitting the
StringIndexer and VectorIndexer to only the training data, and it should
the the entire data set.  So if the training data does not include all of
the labels and an unknown label appears in the test data during evaluation,
then it will not know how to index it.  So your code should be like this,
fit with 'digits' instead of 'training'

val labelIndexer = new
StringIndexer().setInputCol("label").setOutputCol("indexedLabel").fit(digits)
// Automatically identify categorical features, and index them.
// Set maxCategories so features with > 4 distinct values are treated as
continuous.
val featureIndexer = new
VectorIndexer().setInputCol("features").setOutputCol("indexedFeatures").setMaxCategories(4).fit(digits)

Hope that helps!

On Fri, Jul 1, 2016 at 9:24 AM, Rich Tarro <ri...@gmail.com> wrote:

> Hi Bryan.
>
> Thanks for your continued help.
>
> Here is the code shown in a Jupyter notebook. I figured this was easier
> that cutting and pasting the code into an email. If you  would like me to
> send you the code in a different format let, me know. The necessary data is
> all downloaded within the notebook itself.
>
>
> https://console.ng.bluemix.net/data/notebooks/fe7e578a-401f-4744-a318-b1b6bcf6f5f8/view?access_token=2f6df7b1dfcb3c1c2d94a794506bb282729dab8f05118fafe5f11886326e02fc
>
> A few additional pieces of information.
>
> 1. The training dataset is cached before training the model. If you do not
> cache the training dataset, the model will not train. The code
> model.transform(test) fails with a similar error. No other changes besides
> caching or not caching. Again, with the training dataset cached, the model
> can be successfully trained as seen in the notebook.
>
> 2. I have another version of the notebook where I download the same data
> in libsvm format rather than csv. That notebook works fine. All the code is
> essentially the same accounting for the difference in file formats.
>
> 3. I tested this same code on another Spark cloud platform and it displays
> the same symptoms when run there.
>
> Thanks.
> Rich
>
>
> On Wed, Jun 29, 2016 at 12:59 AM, Bryan Cutler <cu...@gmail.com> wrote:
>
>> Are you fitting the VectorIndexer to the entire data set and not just
>> training or test data?  If you are able to post your code and some data to
>> reproduce, that would help in troubleshooting.
>>
>> On Tue, Jun 28, 2016 at 4:40 PM, Rich Tarro <ri...@gmail.com> wrote:
>>
>>> Thanks for the response, but in my case I reversed the meaning of
>>> "prediction" and "predictedLabel". It seemed to make more sense to me that
>>> way, but in retrospect, it probably only causes confusion to anyone else
>>> looking at this. I reran the code with all the pipeline stage inputs and
>>> outputs named exactly as in the Random Forest Classifier example to make
>>> sure I hadn't messed anything up when I renamed things. Same error.
>>>
>>> I'm still at the point where I can train the model and make predictions,
>>> but not able to get the MulticlassClassificationEvaluator to work on
>>> the DataFrame of predictions.
>>>
>>> Any other suggestions? Thanks.
>>>
>>>
>>>
>>> On Tue, Jun 28, 2016 at 4:21 PM, Rich Tarro <ri...@gmail.com> wrote:
>>>
>>>> I created a ML pipeline using the Random Forest Classifier - similar to
>>>> what is described here except in my case the source data is in csv format
>>>> rather than libsvm.
>>>>
>>>>
>>>> https://spark.apache.org/docs/latest/ml-classification-regression.html#random-forest-classifier
>>>>
>>>> I am able to successfully train the model and make predictions (on test
>>>> data not used to train the model) as shown here.
>>>>
>>>> +------------+--------------+-----+----------+--------------------+
>>>> |indexedLabel|predictedLabel|label|prediction|            features|
>>>> +------------+--------------+-----+----------+--------------------+
>>>> |         4.0|           4.0|    0|         0|(784,[124,125,126...|
>>>> |         2.0|           2.0|    3|         3|(784,[119,120,121...|
>>>> |         8.0|           8.0|    8|         8|(784,[180,181,182...|
>>>> |         0.0|           0.0|    1|         1|(784,[154,155,156...|
>>>> |         3.0|           8.0|    2|         8|(784,[148,149,150...|
>>>> +------------+--------------+-----+----------+--------------------+
>>>> only showing top 5 rows
>>>>
>>>> However, when I attempt to calculate the error between the indexedLabel and the precictedLabel using the MulticlassClassificationEvaluator, I get the NoSuchElementException error attached below.
>>>>
>>>> val evaluator = new MulticlassClassificationEvaluator().setLabelCol("indexedLabel").setPredictionCol("predictedLabel").setMetricName("precision")
>>>> val accuracy = evaluator.evaluate(predictions)
>>>> println("Test Error = " + (1.0 - accuracy))
>>>>
>>>> What could be the issue?
>>>>
>>>>
>>>>
>>>> Name: org.apache.spark.SparkException
>>>> Message: Job aborted due to stage failure: Task 2 in stage 49.0 failed 10 times, most recent failure: Lost task 2.9 in stage 49.0 (TID 162, yp-spark-dal09-env5-0024): java.util.NoSuchElementException: key not found: 132.0
>>>> 	at scala.collection.MapLike$class.default(MapLike.scala:228)
>>>> 	at scala.collection.AbstractMap.default(Map.scala:58)
>>>> 	at scala.collection.MapLike$class.apply(MapLike.scala:141)
>>>> 	at scala.collection.AbstractMap.apply(Map.scala:58)
>>>> 	at org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$10.apply(VectorIndexer.scala:331)
>>>> 	at org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$10.apply(VectorIndexer.scala:309)
>>>> 	at org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$11.apply(VectorIndexer.scala:351)
>>>> 	at org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$11.apply(VectorIndexer.scala:351)
>>>> 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown Source)
>>>> 	at org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:67)
>>>> 	at org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:67)
>>>> 	at org.apache.spark.sql.execution.Filter$$anonfun$2$$anonfun$apply$2.apply(basicOperators.scala:74)
>>>> 	at org.apache.spark.sql.execution.Filter$$anonfun$2$$anonfun$apply$2.apply(basicOperators.scala:72)
>>>> 	at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390)
>>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>> 	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:189)
>>>> 	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64)
>>>> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>>>> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>>> 	at org.apache.spark.scheduler.Task.run(Task.scala:89)
>>>> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>>> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153)
>>>> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>>>> 	at java.lang.Thread.run(Thread.java:785)
>>>>
>>>> Driver stacktrace:
>>>> StackTrace: org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
>>>> scala.Option.foreach(Option.scala:236)
>>>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
>>>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>>> java.lang.Thread.getStackTrace(Thread.java:1117)
>>>> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
>>>> org.apache.spark.SparkContext.runJob(SparkContext.scala:1837)
>>>> org.apache.spark.SparkContext.runJob(SparkContext.scala:1850)
>>>> org.apache.spark.SparkContext.runJob(SparkContext.scala:1863)
>>>> org.apache.spark.SparkContext.runJob(SparkContext.scala:1934)
>>>> org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927)
>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
>>>> org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
>>>> org.apache.spark.rdd.RDD.collect(RDD.scala:926)
>>>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$collectAsMap$1.apply(PairRDDFunctions.scala:741)
>>>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$collectAsMap$1.apply(PairRDDFunctions.scala:740)
>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
>>>> org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
>>>> org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.scala:740)
>>>> org.apache.spark.mllib.evaluation.MulticlassMetrics.tpByClass$lzycompute(MulticlassMetrics.scala:49)
>>>> org.apache.spark.mllib.evaluation.MulticlassMetrics.tpByClass(MulticlassMetrics.scala:45)
>>>> org.apache.spark.mllib.evaluation.MulticlassMetrics.precision$lzycompute(MulticlassMetrics.scala:142)
>>>> org.apache.spark.mllib.evaluation.MulticlassMetrics.precision(MulticlassMetrics.scala:142)
>>>> org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator.evaluate(MulticlassClassificationEvaluator.scala:84)
>>>> $line110.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:59)
>>>> $line110.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:64)
>>>> $line110.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:66)
>>>> $line110.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:68)
>>>> $line110.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:70)
>>>> $line110.$read$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:72)
>>>> $line110.$read$$iwC$$iwC$$iwC$$iwC.<init>(<console>:74)
>>>> $line110.$read$$iwC$$iwC$$iwC.<init>(<console>:76)
>>>> $line110.$read$$iwC$$iwC.<init>(<console>:78)
>>>> $line110.$read$$iwC.<init>(<console>:80)
>>>> $line110.$read.<init>(<console>:82)
>>>> $line110.$read$.<init>(<console>:86)
>>>> $line110.$read$.<clinit>(<console>)
>>>> $line110.$eval$.<init>(<console>:7)
>>>> $line110.$eval$.<clinit>(<console>)
>>>> $line110.$eval.$print(<console>)
>>>> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:95)
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:55)
>>>> java.lang.reflect.Method.invoke(Method.java:507)
>>>> org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
>>>> org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346)
>>>> org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
>>>> org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
>>>> org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
>>>> com.ibm.spark.interpreter.ScalaInterpreter$$anonfun$interpretAddTask$1$$anonfun$apply$3.apply(ScalaInterpreter.scala:296)
>>>> com.ibm.spark.interpreter.ScalaInterpreter$$anonfun$interpretAddTask$1$$anonfun$apply$3.apply(ScalaInterpreter.scala:291)
>>>> com.ibm.spark.global.StreamState$.withStreams(StreamState.scala:80)
>>>> com.ibm.spark.interpreter.ScalaInterpreter$$anonfun$interpretAddTask$1.apply(ScalaInterpreter.scala:290)
>>>> com.ibm.spark.interpreter.ScalaInterpreter$$anonfun$interpretAddTask$1.apply(ScalaInterpreter.scala:290)
>>>> com.ibm.spark.utils.TaskManager$$anonfun$add$2$$anon$1.run(TaskManager.scala:123)
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153)
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>>>> java.lang.Thread.run(Thread.java:785)
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>
>>
>