You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/03 21:55:01 UTC

[GitHub] [beam] kennknowles opened a new issue, #18984: [SQL] Output schema is not set incorrectly

kennknowles opened a new issue, #18984:
URL: https://github.com/apache/beam/issues/18984

   *From: https://stackoverflow.com/questions/52181795/how-do-i-get-an-output-schema-for-an-apache-beam-sql-query :*
   
   I've been playing with the Beam SQL DSL and I'm unable to use the output from a query without providing a code that's aware of the output schema manually. Can I infer the output schema rather than hardcoding it?
   
   Neither the walkthrough or the examples actually use the output from a query. I'm using Scio rather than the plain Java API to keep the code relatively readable and concise, I don't think that makes a difference for this question.
   
   Here's an example of what I mean.
   
   Given an input schema inSchema and some data source that is mapped onto a Row as follows: (in this example, Avro-based, but again, I don't think that matters):
   
   ```
   
   sc.avroFile[Foo](args("input"))
      .map(fooToRow)
      .setCoder(inSchema.getRowCoder)
      .applyTransform(SqlTransform.query("SELECT
   COUNT(1) FROM PCOLLECTION"))
      .saveAsTextFile(args("output"))
   
   ```
   
   
   Running this pipeline results in a KryoException as follows:
   
   ```
   
   com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
   Serialization trace:
   fieldIndices
   (org.apache.beam.sdk.schemas.Schema)
   schema (org.apache.beam.sdk.values.RowWithStorage)
   org.apache.beam.sdk.Pipeline$PipelineExecutionException:
   
   com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
   
   ```
   
   
   However, inserting a RowCoder matching the SQL output, in this case a single count int column:
   
   ```
   
      ...snip...
      .applyTransform(SqlTransform.query("SELECT COUNT(1) FROM PCOLLECTION"))
      .setCoder(Schema.builder().addInt64Field("count").build().getRowCoder)
   
     .saveAsTextFile(args("output"))
   
   ```
   
   
   Now the pipeline runs just fine.
   
   Having to manually tell the pipeline how to encode the SQL output seems unnecessary, given that we specify the input schema/coder(s) and a query. It seems to me that we should be able to infer the output schema from that - but I can't see how, other than maybe using Calcite directly?
   
   Before raising a ticket on the Beam Jira, I thought I'd check I wasn't missing something obvious!
   
   
   
   Imported from Jira [BEAM-5335](https://issues.apache.org/jira/browse/BEAM-5335). Original Jira may contain additional context.
   Reported by: kedin.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org