You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "John Hogue (JIRA)" <ji...@apache.org> on 2016/01/21 01:06:39 UTC
[jira] [Created] (SPARK-12944) CrossValidator doesn't accept a
Pipeline as an estimator
John Hogue created SPARK-12944:
----------------------------------
Summary: CrossValidator doesn't accept a Pipeline as an estimator
Key: SPARK-12944
URL: https://issues.apache.org/jira/browse/SPARK-12944
Project: Spark
Issue Type: Bug
Components: ML, PySpark
Affects Versions: 1.6.0
Environment: spark-1.6.0-bin-hadoop2.6
Python 3.4.4 :: Anaconda 2.4.1
Reporter: John Hogue
Priority: Minor
Pipeline is supposed to act as an estimator which CrossValidator currently throws error.
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.tuning import CrossValidator
# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and nb.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
nb = NaiveBayes()
pipeline = Pipeline(stages=[tokenizer, hashingTF, nb])
paramGrid = ParamGridBuilder().addGrid(nb.smoothing, [0, 1]).build()
cv = CrossValidator(estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=MulticlassClassificationEvaluator(),
numFolds=4)
cvModel = cv.fit(training_df)
Sample dataset can be found here:
https://github.com/dreyco676/nlp_spark/blob/master/data.zip
The file can be converted to a DataFrame with:
# Load precleaned training set
training_rdd = sc.textFile("data/clean_training.txt")
parts_rdd = training_rdd.map(lambda l: l.split("\t"))
# Filter bad rows out
garantee_col_rdd = parts_rdd.filter(lambda l: len(l) == 3)
typed_rdd = garantee_col_rdd.map(lambda p: (p[0], p[1], float(p[2])))
# Create DataFrame
training_df = sqlContext.createDataFrame(typed_rdd, ["id", "text", "label"])
Running the pipeline throws the following stack trace:
---------------------------------------------------------------------------Py4JJavaError Traceback (most recent call last)<ipython-input-3-34e9e27acada> in <module>()
17 numFolds=4)
18
---> 19 cvModel = cv.fit(training_df)
/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/pipeline.py in fit(self, dataset, params)
67 return self.copy(params)._fit(dataset)
68 else:
---> 69 return self._fit(dataset)
70 else:
71 raise ValueError("Params must be either a param map or a list/tuple of param maps, "
/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/tuning.py in _fit(self, dataset)
237 train = df.filter(~condition)
238 for j in range(numModels):
--> 239 model = est.fit(train, epm[j])
240 # TODO: duplicate evaluator to take extra params from input
241 metric = eva.evaluate(model.transform(validation, epm[j]))
/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/pipeline.py in fit(self, dataset, params)
65 elif isinstance(params, dict):
66 if params:
---> 67 return self.copy(params)._fit(dataset)
68 else:
69 return self._fit(dataset)
/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/pipeline.py in _fit(self, dataset)
211 dataset = stage.transform(dataset)
212 else: # must be an Estimator
--> 213 model = stage.fit(dataset)
214 transformers.append(model)
215 if i < indexOfLastEstimator:
/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/pipeline.py in fit(self, dataset, params)
67 return self.copy(params)._fit(dataset)
68 else:
---> 69 return self._fit(dataset)
70 else:
71 raise ValueError("Params must be either a param map or a list/tuple of param maps, "
/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/wrapper.py in _fit(self, dataset)
130
131 def _fit(self, dataset):
--> 132 java_model = self._fit_java(dataset)
133 return self._create_model(java_model)
134
/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/wrapper.py in _fit_java(self, dataset)
126 :return: fitted Java model
127 """
--> 128 self._transfer_params_to_java()
129 return self._java_obj.fit(dataset._jdf)
130
/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/wrapper.py in _transfer_params_to_java(self)
80 for param in self.params:
81 if param in paramMap:
---> 82 pair = self._make_java_param_pair(param, paramMap[param])
83 self._java_obj.set(pair)
84
/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/wrapper.py in _make_java_param_pair(self, param, value)
71 java_param = self._java_obj.getParam(param.name)
72 java_value = _py2java(sc, value)
---> 73 return java_param.w(java_value)
74
75 def _transfer_params_to_java(self):
/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
811 answer = self.gateway_client.send_command(command)
812 return_value = get_return_value(
--> 813 answer, self.gateway_client, self.target_id, self.name)
814
815 for temp_arg in temp_args:
/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/sql/utils.py in deco(*a, **kw)
43 def deco(*a, **kw):
44 try:
---> 45 return f(*a, **kw)
46 except py4j.protocol.Py4JJavaError as e:
47 s = e.java_exception.toString()
/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/lib/py4j-0.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
306 raise Py4JJavaError(
307 "An error occurred while calling {0}{1}{2}.\n".
--> 308 format(target_id, ".", name), value)
309 else:
310 raise Py4JError(
Py4JJavaError: An error occurred while calling o113.w.
: java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Double
at scala.runtime.BoxesRunTime.unboxToDouble(BoxesRunTime.java:119)
at org.apache.spark.ml.param.DoubleParam.w(params.scala:223)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:745)
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org