You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Andy Davidson <An...@SantaCruzIntegration.com> on 2016/01/05 05:08:20 UTC

problem with DataFrame df.withColumn() org.apache.spark.sql.AnalysisException: resolved attribute(s) missing

I am having a heck of a time writing a simple transformer in Java. I assume
that my Transformer is supposed to append a new column to the dataFrame
argument. Any idea why I get the following exception in Java 8 when I try to
call DataFrame withColumn()? The JavaDoc says withColumn() "Returns a new
DataFrame 
<http://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/DataFrame
.html>  by adding a column or replacing the existing column that has the
same name.²


Also do transformers always run in the driver? If not I assume workers do
not have the sqlContext. Any idea how I can convert an javaRDD<> to a Column
with out a sqlContext?

Kind regards

Andy

P.s. I am using spark 1.6.0

org.apache.spark.sql.AnalysisException: resolved attribute(s)
filteredOutput#1 missing from rawInput#0 in operator !Project
[rawInput#0,filteredOutput#1 AS filteredOutput#2];
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(Chec
kAnalysis.scala:38)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:
44)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$
1.apply(CheckAnalysis.scala:183)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$
1.apply(CheckAnalysis.scala:50)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:105)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(Che
ckAnalysis.scala:50)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala
:44)
at 
org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.
scala:34)
at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:133)
at 
org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$withPlan(Data
Frame.scala:2165)
at org.apache.spark.sql.DataFrame.select(DataFrame.scala:751)
at org.apache.spark.sql.DataFrame.withColumn(DataFrame.scala:1227)
at com.pws.poc.ml.StemmerTransformer.transform(StemmerTransformer.java:110)
at 
com.pws.poc.ml.StemmerTransformerTest.test(StemmerTransformerTest.java:45)



public class StemmerTransformer extends Transformer implements Serializable
{

   String inputCol; // unit test sets to rawInput
   String outputCol; // unit test sets to filteredOutput

  Š


  public StemmerTransformer(SQLContext sqlContext) {

// will only work if transformers execute in the driver

        this.sqlContext = sqlContext;

    }


     @Override

    public DataFrame transform(DataFrame df) {

        df.printSchema();

        df.show();

        

        JavaRDD<Row> inRowRDD = df.select(inputCol).javaRDD();

        JavaRDD<Row> outRowRDD = inRowRDD.map((Row row) -> {

            // TODO add stemming code

            // Create a new Row

            Row ret = RowFactory.create("TODO");

            return ret;

        });

        

        //can we create a Col from a JavaRDD<Row>?

        

        List<StructField> fields = new ArrayList<StructField>();

        boolean nullable = true;

        fields.add(DataTypes.createStructField(outputCol,
DataTypes.StringType, nullable));



        StructType schema =  DataTypes.createStructType(fields);

        DataFrame outputDF = sqlContext.createDataFrame(outRowRDD, schema);

        outputDF.printSchema();

        outputDF.show();

        Column newCol = outputDF.col(outputCol);

        

        return df.withColumn(outputCol, newCol);

    }



SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
explanation.

SLF4J: Actual binding is of type
[ch.qos.logback.classic.util.ContextSelectorStaticBinder]

WARN  03:58:46 main o.a.h.u.NativeCodeLoader <clinit> line:62 Unable to load
native-hadoop library for your platform... using builtin-java classes where
applicable

root

 |-- rawInput: array (nullable = false)

 |    |-- element: string (containsNull = true)



+--------------------+

|            rawInput|

+--------------------+

|[I, saw, the, red...|

|[Mary, had, a, li...|

|[greet, greeting,...|

+--------------------+



root

 |-- filteredOutput: string (nullable = true)



+--------------+

|filteredOutput|

+--------------+

|          TODO|

|          TODO|

|          TODO|

+--------------+





Re: problem with DataFrame df.withColumn() org.apache.spark.sql.AnalysisException: resolved attribute(s) missing

Posted by Michael Armbrust <mi...@databricks.com>.
oh, and I think I installed jekyll using "gem install jekyll"

On Wed, Jan 6, 2016 at 4:17 PM, Michael Armbrust <mi...@databricks.com>
wrote:

> from docs/ run:
>
> SKIP_API=1 jekyll serve --watch
>
> On Wed, Jan 6, 2016 at 4:12 PM, Andy Davidson <
> Andy@santacruzintegration.com> wrote:
>
>> Hi Michael
>>
>> I am happy to add some documentation.
>>
>> I forked the repo but am having trouble with the markdown. The code
>> examples are not rendering correctly. I am on a mac and using
>> https://itunes.apple.com/us/app/marked-2/id890031187?mt=12
>>
>> I use a emacs or some other text editor to change the md.
>>
>> What tools do you use for editing viewing spark markdown files?
>>
>> Andy
>>
>>
>>
>> From: Michael Armbrust <mi...@databricks.com>
>> Date: Wednesday, January 6, 2016 at 11:09 AM
>> To: Andrew Davidson <An...@SantaCruzIntegration.com>
>> Cc: "user @spark" <us...@spark.apache.org>
>> Subject: Re: problem with DataFrame df.withColumn()
>> org.apache.spark.sql.AnalysisException: resolved attribute(s) missing
>>
>> I really appreciate your help. I The following code works.
>>>
>>
>> Glad you got it to work!
>>
>> Is there a way this example can be added to the distribution to make it
>>> easier for future java programmers? It look me a long time get to this
>>> simple solution.
>>>
>>
>> I'd welcome a pull request that added UDFs to the programming guide
>> section on dataframes:
>>
>> http://spark.apache.org/docs/latest/sql-programming-guide.html#dataframe-operations
>>
>>
>

Re: problem with DataFrame df.withColumn() org.apache.spark.sql.AnalysisException: resolved attribute(s) missing

Posted by Michael Armbrust <mi...@databricks.com>.
from docs/ run:

SKIP_API=1 jekyll serve --watch

On Wed, Jan 6, 2016 at 4:12 PM, Andy Davidson <Andy@santacruzintegration.com
> wrote:

> Hi Michael
>
> I am happy to add some documentation.
>
> I forked the repo but am having trouble with the markdown. The code
> examples are not rendering correctly. I am on a mac and using
> https://itunes.apple.com/us/app/marked-2/id890031187?mt=12
>
> I use a emacs or some other text editor to change the md.
>
> What tools do you use for editing viewing spark markdown files?
>
> Andy
>
>
>
> From: Michael Armbrust <mi...@databricks.com>
> Date: Wednesday, January 6, 2016 at 11:09 AM
> To: Andrew Davidson <An...@SantaCruzIntegration.com>
> Cc: "user @spark" <us...@spark.apache.org>
> Subject: Re: problem with DataFrame df.withColumn()
> org.apache.spark.sql.AnalysisException: resolved attribute(s) missing
>
> I really appreciate your help. I The following code works.
>>
>
> Glad you got it to work!
>
> Is there a way this example can be added to the distribution to make it
>> easier for future java programmers? It look me a long time get to this
>> simple solution.
>>
>
> I'd welcome a pull request that added UDFs to the programming guide
> section on dataframes:
>
> http://spark.apache.org/docs/latest/sql-programming-guide.html#dataframe-operations
>
>

Re: problem with DataFrame df.withColumn() org.apache.spark.sql.AnalysisException: resolved attribute(s) missing

Posted by Andy Davidson <An...@SantaCruzIntegration.com>.
Hi Michael

I am happy to add some documentation.

I forked the repo but am having trouble with the markdown. The code examples
are not rendering correctly. I am on a mac and using
https://itunes.apple.com/us/app/marked-2/id890031187?mt=12

I use a emacs or some other text editor to change the md.

What tools do you use for editing viewing spark markdown files?

Andy



From:  Michael Armbrust <mi...@databricks.com>
Date:  Wednesday, January 6, 2016 at 11:09 AM
To:  Andrew Davidson <An...@SantaCruzIntegration.com>
Cc:  "user @spark" <us...@spark.apache.org>
Subject:  Re: problem with DataFrame df.withColumn()
org.apache.spark.sql.AnalysisException: resolved attribute(s) missing

>> I really appreciate your help. I The following code works.
> 
> Glad you got it to work!
> 
>> Is there a way this example can be added to the distribution to make it
>> easier for future java programmers? It look me a long time get to this simple
>> solution.
> 
> I'd welcome a pull request that added UDFs to the programming guide section on
> dataframes:
> http://spark.apache.org/docs/latest/sql-programming-guide.html#dataframe-opera
> tions



Re: problem with DataFrame df.withColumn() org.apache.spark.sql.AnalysisException: resolved attribute(s) missing

Posted by Michael Armbrust <mi...@databricks.com>.
>
> I really appreciate your help. I The following code works.
>

Glad you got it to work!

Is there a way this example can be added to the distribution to make it
> easier for future java programmers? It look me a long time get to this
> simple solution.
>

I'd welcome a pull request that added UDFs to the programming guide section
on dataframes:
http://spark.apache.org/docs/latest/sql-programming-guide.html#dataframe-operations

Re: problem with DataFrame df.withColumn() org.apache.spark.sql.AnalysisException: resolved attribute(s) missing

Posted by Andy Davidson <An...@SantaCruzIntegration.com>.
Hi Micheal

I really appreciate your help. I The following code works. Is there a way
this example can be added to the distribution to make it easier for future
java programmers? It look me a long time get to this simple solution.

I'll need to tweak this example a little to work with the new PipeLine save
functionality. We need the current sqlContext to register our UDF. I see if
I can pass this in the Param Map. I¹ll throw and exception is some one use
transform(df) 

public class StemmerTransformer extends Transformer implements Serializable
{

   void registerUDF() {

        if (udf == null) {

            udf = new UDF();

            DataType returnType =
DataTypes.createArrayType(DataTypes.StringType);

            sqlContext.udf().register(udfName, udf, returnType);

        }

    }



   @Override

    public DataFrame transform(DataFrame df) {

        df.printSchema();

        df.show();

        

        registerUDF();



        DataFrame ret = df.selectExpr("*", "StemUDF(rawInput) as
filteredOutput");

        return ret;

    }



   class UDF implements UDF1<WrappedArray<String>, List<String>> {

        private static final long serialVersionUID = 1L;



        @Override

        public List<String> call(WrappedArray<String> wordsArg) throws
Exception {

            List<String> words = JavaConversions.asJavaList(wordsArg);

            ArrayList<String> ret = new ArrayList<String>(words.size());

            for (String word : words) {

// TODO replace test code

                ret.add(word + "_stemed");

            }

           

            return ret;

        }

    }

}



root

 |-- rawInput: array (nullable = false)

 |    |-- element: string (containsNull = true)



+--------------------+

|            rawInput|

+--------------------+

|[I, saw, the, red...|

|[Mary, had, a, li...|

|[greet, greeting,...|

+--------------------+



root

 |-- rawInput: array (nullable = false)

 |    |-- element: string (containsNull = true)

 |-- filteredOutput: array (nullable = true)

 |    |-- element: string (containsNull = true)



+----------------------------------+----------------------------------------
-----------------------+

|rawInput                          |filteredOutput
|

+----------------------------------+----------------------------------------
-----------------------+

|[I, saw, the, red, baloon]        |[I_stemed, saw_stemed, the_stemed,
red_stemed, baloon_stemed]  |

|[Mary, had, a, little, lamb]      |[Mary_stemed, had_stemed, a_stemed,
little_stemed, lamb_stemed]|

|[greet, greeting, greets, greeted]|[greet_stemed, greeting_stemed,
greets_stemed, greeted_stemed] |

+----------------------------------+----------------------------------------
-----------------------+



From:  Michael Armbrust <mi...@databricks.com>
Date:  Tuesday, January 5, 2016 at 12:58 PM
To:  Andrew Davidson <An...@SantaCruzIntegration.com>
Cc:  "user @spark" <us...@spark.apache.org>
Subject:  Re: problem with DataFrame df.withColumn()
org.apache.spark.sql.AnalysisException: resolved attribute(s) missing

>> I am trying to implement  org.apache.spark.ml <http://org.apache.spark.ml>
>> .Transformer interface in Java 8.
>> My understanding is the sudo code for transformers is something like
>> @Override
>> 
>>     public DataFrame transform(DataFrame df) {
>> 
>> 1. Select the input column
>> 
>> 2. Create a new column
>> 
>> 3. Append the new column to the df argument and return
>> 
>>    }
> 
> 
> The following line can be used inside of the transform function to return a
> Dataframe that has been augmented with a new column using the stem lambda
> function (defined as a UDF below).
> return df.withColumn("filteredInput", expr("stem(rawInput)"));
> This is producing a new column called filterInput (that is appended to
> whatever columns are already there) by passing the column rawInput to your
> arbitrary lambda function.
>  
>> Based on my experience the current DataFrame api is very limited. You can not
>> apply a complicated lambda function. As a work around I convert the data
>> frame to a JavaRDD, apply my complicated lambda, and then convert the
>> resulting RDD back to a Data Frame.
> 
> 
> This is exactly what this code is doing.  You are defining an arbitrary lambda
> function as a UDF.  The difference here, when compared to a JavaRDD map, is
> that you can use this UDF to append columns without having to manually append
> the new data to some existing object.
> sqlContext.udf().register("stem", new UDF1<String, String>() {
>   @Override
>   public String call(String str) {
>     return // TODO: stemming code here
>   }
> }, DataTypes.StringType);
>> Now I select the ³new column² from the Data Frame and try to call
>> df.withColumn().
>> 
>> 
>> 
>> I can try an implement this as a UDF. How ever I need to use several 3rd
>> party jars. Any idea how insure the workers will have the required jar files?
>> If I was submitting a normal java app I would create an uber jar will this
>> work with UDFs?
> 
> 
> Yeah, UDFs are run the same way as your RDD lambda functions.



Re: problem with DataFrame df.withColumn() org.apache.spark.sql.AnalysisException: resolved attribute(s) missing

Posted by Michael Armbrust <mi...@databricks.com>.
>
> I am trying to implement  org.apache.spark.ml.Transformer interface in
> Java 8.
>
My understanding is the sudo code for transformers is something like
>
> @Override
>
>     public DataFrame transform(DataFrame df) {
>
> 1. Select the input column
>
> 2. Create a new column
>
> 3. Append the new column to the df argument and return
>
>    }
>

The following line can be used inside of the transform function to return a
Dataframe that has been augmented with a new column using the stem lambda
function (defined as a UDF below).

return df.withColumn("filteredInput", expr("stem(rawInput)"));

This is producing a new column called filterInput (that is appended to
whatever columns are already there) by passing the column rawInput to your
arbitrary lambda function.


> Based on my experience the current DataFrame api is very limited. You can
> not apply a complicated lambda function. As a work around I convert the
> data frame to a JavaRDD, apply my complicated lambda, and then convert the
> resulting RDD back to a Data Frame.
>

This is exactly what this code is doing.  You are defining an arbitrary
lambda function as a UDF.  The difference here, when compared to a JavaRDD
map, is that you can use this UDF to append columns without having to
manually append the new data to some existing object.

sqlContext.udf().register("stem", new UDF1<String, String>() {
  @Override
  public String call(String str) {
    return // TODO: stemming code here
  }
}, DataTypes.StringType);

Now I select the “new column” from the Data Frame and try to call
> df.withColumn().
>
>
> I can try an implement this as a UDF. How ever I need to use several 3rd
> party jars. Any idea how insure the workers will have the required jar
> files? If I was submitting a normal java app I would create an uber jar
> will this work with UDFs?
>

Yeah, UDFs are run the same way as your RDD lambda functions.

Re: problem with DataFrame df.withColumn() org.apache.spark.sql.AnalysisException: resolved attribute(s) missing

Posted by Andy Davidson <An...@SantaCruzIntegration.com>.
Hi Michael

I am not sure you under stand my code correct.

I am trying to implement  org.apache.spark.ml.Transformer interface in Java
8.


My understanding is the sudo code for transformers is something like
@Override

    public DataFrame transform(DataFrame df) {

1. Select the input column

2. Create a new column

3. Append the new column to the df argument and return

   }



Based on my experience the current DataFrame api is very limited. You can
not apply a complicated lambda function. As a work around I convert the data
frame to a JavaRDD, apply my complicated lambda, and then convert the
resulting RDD back to a Data Frame.



Now I select the ³new column² from the Data Frame and try to call
df.withColumn().



I can try an implement this as a UDF. How ever I need to use several 3rd
party jars. Any idea how insure the workers will have the required jar
files? If I was submitting a normal java app I would create an uber jar will
this work with UDFs?



Kind regards



Andy



From:  Michael Armbrust <mi...@databricks.com>
Date:  Monday, January 4, 2016 at 11:14 PM
To:  Andrew Davidson <An...@SantaCruzIntegration.com>
Cc:  "user @spark" <us...@spark.apache.org>
Subject:  Re: problem with DataFrame df.withColumn()
org.apache.spark.sql.AnalysisException: resolved attribute(s) missing

> Its not really possible to convert an RDD to a Column.  You can think of a
> Column as an expression that produces a single output given some set of input
> columns.  If I understand your code correctly, I think this might be easier to
> express as a UDF:
> sqlContext.udf().register("stem", new UDF1<String, String>() {
>   @Override
>   public String call(String str) {
>     return // TODO: stemming code here
>   }
> }, DataTypes.StringType);
> DataFrame transformed = df.withColumn("filteredInput",
> expr("stem(rawInput)"));
> 
> On Mon, Jan 4, 2016 at 8:08 PM, Andy Davidson <An...@santacruzintegration.com>
> wrote:
>> I am having a heck of a time writing a simple transformer in Java. I assume
>> that my Transformer is supposed to append a new column to the dataFrame
>> argument. Any idea why I get the following exception in Java 8 when I try to
>> call DataFrame withColumn()? The JavaDoc says withColumn() "Returns a new
>> DataFrame 
>> <http://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/DataFrame.
>> html>  by adding a column or replacing the existing column that has the same
>> name.²
>> 
>> 
>> Also do transformers always run in the driver? If not I assume workers do not
>> have the sqlContext. Any idea how I can convert an javaRDD<> to a Column with
>> out a sqlContext?
>> 
>> Kind regards
>> 
>> Andy
>> 
>> P.s. I am using spark 1.6.0
>> 
>> org.apache.spark.sql.AnalysisException: resolved attribute(s)
>> filteredOutput#1 missing from rawInput#0 in operator !Project
>> [rawInput#0,filteredOutput#1 AS filteredOutput#2];
>> at 
>> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(Check
>> Analysis.scala:38)
>> at 
>> org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:4
>> 4)
>> at 
>> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1
>> .apply(CheckAnalysis.scala:183)
>> at 
>> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1
>> .apply(CheckAnalysis.scala:50)
>> at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:105)
>> at 
>> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(Chec
>> kAnalysis.scala:50)
>> at 
>> org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:
>> 44)
>> at 
>> org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.s
>> cala:34)
>> at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:133)
>> at org.apache.spark.sql.DataFrame.org
>> <http://org.apache.spark.sql.DataFrame.org>
>> $apache$spark$sql$DataFrame$$withPlan(DataFrame.scala:2165)
>> at org.apache.spark.sql.DataFrame.select(DataFrame.scala:751)
>> at org.apache.spark.sql.DataFrame.withColumn(DataFrame.scala:1227)
>> at com.pws.poc.ml.StemmerTransformer.transform(StemmerTransformer.java:110)
>> at com.pws.poc.ml.StemmerTransformerTest.test(StemmerTransformerTest.java:45)
>> 
>> 
>> 
>> public class StemmerTransformer extends Transformer implements Serializable {
>> 
>>    String inputCol; // unit test sets to rawInput
>>    String outputCol; // unit test sets to filteredOutput
>> 
>>   Š
>> 
>> 
>>   public StemmerTransformer(SQLContext sqlContext) {
>> 
>> // will only work if transformers execute in the driver
>> 
>>         this.sqlContext = sqlContext;
>> 
>>     }
>> 
>> 
>>      @Override
>> 
>>     public DataFrame transform(DataFrame df) {
>> 
>>         df.printSchema();
>> 
>>         df.show();
>> 
>>         
>> 
>>         JavaRDD<Row> inRowRDD = df.select(inputCol).javaRDD();
>> 
>>         JavaRDD<Row> outRowRDD = inRowRDD.map((Row row) -> {
>> 
>>             // TODO add stemming code
>> 
>>             // Create a new Row
>> 
>>             Row ret = RowFactory.create("TODO");
>> 
>>             return ret;
>> 
>>         });
>> 
>>         
>> 
>>         //can we create a Col from a JavaRDD<Row>?
>> 
>>         
>> 
>>         List<StructField> fields = new ArrayList<StructField>();
>> 
>>         boolean nullable = true;
>> 
>>         fields.add(DataTypes.createStructField(outputCol,
>> DataTypes.StringType, nullable));
>> 
>> 
>> 
>>         StructType schema =  DataTypes.createStructType(fields);
>> 
>>         DataFrame outputDF = sqlContext.createDataFrame(outRowRDD, schema);
>> 
>>         outputDF.printSchema();
>> 
>>         outputDF.show();
>> 
>>         Column newCol = outputDF.col(outputCol);
>> 
>>         
>> 
>>         return df.withColumn(outputCol, newCol);
>> 
>>     }
>> 
>> 
>> 
>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>> explanation.
>> 
>> SLF4J: Actual binding is of type
>> [ch.qos.logback.classic.util.ContextSelectorStaticBinder]
>> 
>> WARN  03:58:46 main o.a.h.u.NativeCodeLoader <clinit> line:62 Unable to load
>> native-hadoop library for your platform... using builtin-java classes where
>> applicable
>> 
>> root
>> 
>>  |-- rawInput: array (nullable = false)
>> 
>>  |    |-- element: string (containsNull = true)
>> 
>> 
>> 
>> +--------------------+
>> 
>> |            rawInput|
>> 
>> +--------------------+
>> 
>> |[I, saw, the, red...|
>> 
>> |[Mary, had, a, li...|
>> 
>> |[greet, greeting,...|
>> 
>> +--------------------+
>> 
>> 
>> 
>> root
>> 
>>  |-- filteredOutput: string (nullable = true)
>> 
>> 
>> 
>> +--------------+
>> 
>> |filteredOutput|
>> 
>> +--------------+
>> 
>> |          TODO|
>> 
>> |          TODO|
>> 
>> |          TODO|
>> 
>> +--------------+
>> 
>> 
> 



Re: problem with DataFrame df.withColumn() org.apache.spark.sql.AnalysisException: resolved attribute(s) missing

Posted by Michael Armbrust <mi...@databricks.com>.
Its not really possible to convert an RDD to a Column.  You can think of a
Column as an expression that produces a single output given some set of
input columns.  If I understand your code correctly, I think this might be
easier to express as a UDF:

sqlContext.udf().register("stem", new UDF1<String, String>() {
  @Override
  public String call(String str) {
    return // TODO: stemming code here
  }
}, DataTypes.StringType);

DataFrame transformed = df.withColumn("filteredInput", expr("stem(rawInput)"));


On Mon, Jan 4, 2016 at 8:08 PM, Andy Davidson <Andy@santacruzintegration.com
> wrote:

> I am having a heck of a time writing a simple transformer in Java. I
> assume that my Transformer is supposed to append a new column to the
> dataFrame argument. Any idea why I get the following exception in Java 8
> when I try to call DataFrame withColumn()? The JavaDoc says withColumn() "Returns
> a new DataFrame
> <http://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/DataFrame.html>
>  by adding a column or replacing the existing column that has the same
> name.”
>
>
> Also do transformers always run in the driver? If not I assume workers do
> not have the sqlContext. Any idea how I can convert an javaRDD<> to a
> Column with out a sqlContext?
>
> Kind regards
>
> Andy
>
> P.s. I am using spark 1.6.0
>
> org.apache.spark.sql.AnalysisException: resolved attribute(s)
> filteredOutput#1 missing from rawInput#0 in operator !Project
> [rawInput#0,filteredOutput#1 AS filteredOutput#2];
> at
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38)
> at
> org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:44)
> at
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:183)
> at
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:50)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:105)
> at
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:50)
> at
> org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:44)
> at
> org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:34)
> at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:133)
> at org.apache.spark.sql.DataFrame.org
> $apache$spark$sql$DataFrame$$withPlan(DataFrame.scala:2165)
> at org.apache.spark.sql.DataFrame.select(DataFrame.scala:751)
> at org.apache.spark.sql.DataFrame.withColumn(DataFrame.scala:1227)
> at com.pws.poc.ml.StemmerTransformer.transform(StemmerTransformer.java:110)
> at
> com.pws.poc.ml.StemmerTransformerTest.test(StemmerTransformerTest.java:45)
>
>
>
> public class StemmerTransformer extends Transformer implements
> Serializable {
>    String inputCol; // unit test sets to rawInput
>
>    String outputCol; // unit test sets to filteredOutput
>
>   …
>
>
>   public StemmerTransformer(SQLContext sqlContext) {
>
> // will only work if transformers execute in the driver
>
>         this.sqlContext = sqlContext;
>
>     }
>
>
>      @Override
>
>     public DataFrame transform(DataFrame df) {
>
>         df.printSchema();
>
>         df.show();
>
>
>
>         JavaRDD<Row> inRowRDD = df.select(inputCol).javaRDD();
>
>         JavaRDD<Row> outRowRDD = inRowRDD.map((Row row) -> {
>
>             // TODO add stemming code
>
>             // Create a new Row
>
>             Row ret = RowFactory.create("TODO");
>
>             return ret;
>
>         });
>
>
>
>         //can we create a Col from a JavaRDD<Row>?
>
>
>
>         List<StructField> fields = new ArrayList<StructField>();
>
>         boolean nullable = true;
>
>         fields.add(DataTypes.createStructField(outputCol, DataTypes.
> StringType, nullable));
>
>
>         StructType schema =  DataTypes.createStructType(fields);
>
>         DataFrame outputDF = sqlContext.createDataFrame(outRowRDD, schema
> );
>
>         outputDF.printSchema();
>
>         outputDF.show();
>
>         Column newCol = outputDF.col(outputCol);
>
>
>
>         return df.withColumn(outputCol, newCol);
>
>     }
>
>
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> explanation.
>
> SLF4J: Actual binding is of type
> [ch.qos.logback.classic.util.ContextSelectorStaticBinder]
>
> WARN  03:58:46 main o.a.h.u.NativeCodeLoader <clinit> line:62 Unable to
> load native-hadoop library for your platform... using builtin-java classes
> where applicable
>
> root
>
>  |-- rawInput: array (nullable = false)
>
>  |    |-- element: string (containsNull = true)
>
>
> +--------------------+
>
> |            rawInput|
>
> +--------------------+
>
> |[I, saw, the, red...|
>
> |[Mary, had, a, li...|
>
> |[greet, greeting,...|
>
> +--------------------+
>
>
> root
>
>  |-- filteredOutput: string (nullable = true)
>
>
> +--------------+
>
> |filteredOutput|
>
> +--------------+
>
> |          TODO|
>
> |          TODO|
>
> |          TODO|
>
> +--------------+
>
>
>