You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Miguel Angel (Jira)" <ji...@apache.org> on 2020/03/01 01:46:00 UTC

[jira] [Created] (FLINK-16353) Issue when flink upload a job with stream sql query

Miguel Angel created FLINK-16353:
------------------------------------

             Summary: Issue when flink upload a job with stream sql query
                 Key: FLINK-16353
                 URL: https://issues.apache.org/jira/browse/FLINK-16353
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / API
    Affects Versions: 1.10.0
         Environment: This is my code
{code:java}
class TestQueries extends Serializable{
  def testQuery(): Unit = {
    // Enable settings
    val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tableEnv = StreamTableEnvironment.create(env, settings)
    
    //Consumer kafka topic
    //... topic_consumer

    val stream: DataStream[String] = env.addSource(topic_consumer)
    
    // Convert stream to DataStream[Row]
    val result: DataStream[Row] = stream.map(str => desJson(str))(rowType)

    // desJson is a function to return Row values from deserialize json topic
    // rowType is a rowTypeInfo with (fieldTypes, fieldNames). fieldTypes are Strings and fieldNames ("user", "name", "lastName")

    // Register table
    tableEnv.createTemporaryView("table", result)

    //Queries
    val first_query = tableEnv.sqlQuery("SELECT * from table WHERE name = 'Sansa'")
    val second_query = tableEnv.sqlQuery("SELECT * from table WHERE lastName = 'Stark'")

    //In the following two lines is where the exception occurs
    val first_row: DataStream[Row] = tableEnv.toAppendStream[Row](first_query)
    val second_row: DataStream[Row] = tableEnv.toAppendStream[Row](second_query)

    //Elasticsearch
    // Sending data to Elasticsearch

    env.execute("Test Queries")
  }


{code}

            Reporter: Miguel Angel


{color:#242729}I used the latest flink version(1.10.0) and sbt(1.3.7). I have this exception when upload a job with streaming sql query:{color}

 {color:#242729}Caused by: java.lang.ClassCastException: class org.codehaus.janino.CompilerFactory cannot be cast to class org.codehaus.commons.compiler.ICompilerFactory (org.codehaus.janino.CompilerFactory is in unnamed module of loader org.apache.flink.util.ChildFirstClassLoader @3270d194; org.codehaus.commons.compiler.ICompilerFactory is in unnamed module of loader 'app')
 at org.codehaus.commons.compiler.CompilerFactoryFactory.getCompilerFactory(CompilerFactoryFactory.java:129)
 at org.codehaus.commons.compiler.CompilerFactoryFactory.getDefaultCompilerFactory(CompilerFactoryFactory.java:79)
 at org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.compile(JaninoRelMetadataProvider.java:432){color}

 

{color:#242729}When I running main class with {color}*sbt run*{color:#242729} it works perfectly.{color}

 

 

 

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)