You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by vince plum <li...@gmail.com> on 2016/02/11 08:09:42 UTC

RDD uses another RDD in pyspark with SPARK-5063 issue

Hi, pyspark experts,

I'm trying to implement a naive Bayes lib with the same interface of
pyspark.mllib.classification.NaiveBayes. train() and predict() will be the
interfaces.

I finished the train(LabeledPoint), but got trouble in predict() due
to SPARK-5063 issue.

*Exception: It appears that you are attempting to broadcast an RDD or
reference an RDD from an action or transformation. RDD transformations and
actions can only be invoked by the driver, not inside of other
transformations; for example, rdd1.map(lambda x: rdd2.values.count() * x)
is invalid because the values transformation and count action cannot be
performed inside of the rdd1.map transformation. For more information, see
SPARK-5063.*


Basically, the model will be save as member var in the class, like
self.weight_map.
The self.weight_map is from a RDD (after some transformation operation),
can be converted to other data structure by using some actions like
collect(), collectAsMap(), etc.

The predict(test_tfidf)'s input has to be RDD of tfidf, and the output
should be able to be zip() with test_labels.

The question is:

    def predict(self, test_tfidf):
        return test_tfidf.map(lambda v:
MyNaiveBayes._predict_v(self.weight_map, v))

No matter which non RDD data structure for self.weight_map, it always
complains the 5063 issue.

btw, I also tried:
1. broadcast:

    def predict(self, bb, test_tfidf):
        return test_tfidf.map(lambda v: MyNaiveBayes._predict_v(bb.value,
v))

bb = sc.broadcast(MyNaiveBayes.weight_map)
predict(bb, test_tfidf)

same error

2. cartesian
ret = test_tfidf.cartesian(self.weight_map).groupByKey():
but i need to zip the output with test_labels, this would trigger the
partition inconsistency issue with zip().

test_labels.zip(model.predict(test_tfidf))
model.predict()


Any suggestion about the right data struct for self.weight_map in the my
MyNaiveBayes class?