You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Fabian Hueske (JIRA)" <ji...@apache.org> on 2018/04/16 12:26:00 UTC

[jira] [Comment Edited] (FLINK-9166) Performance issue with Flink SQL

    [ https://issues.apache.org/jira/browse/FLINK-9166?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16439361#comment-16439361 ] 

Fabian Hueske edited comment on FLINK-9166 at 4/16/18 12:25 PM:
----------------------------------------------------------------

Thanks for opening this issue [~ssubbu@gmail.com].
I think this has nothing to do with SQL but just with the amount of jobs running on a single JM. 
The JM is implemented as an Actor and handles certain communication tasks synchronously in its main thread. Having that many jobs running on a JM can cause the JM to become unresponsive.

I'd suggest to distribute the queries to more Flink JMs / clusters. Since you run this on a resource manager such as YARN, it should be fairly easy to start new clusters in session mode.

Best, Fabian


was (Author: fhueske):
Thanks for opening this issue [~ssubbu@gmail.com].
I think this has nothing to do with SQL but just with the amount of jobs running on a single JM. 
The JM is implemented as an Actor and handles certain communication tasks synchronously in its main thread. Having that many jobs running on a JM can cause the JM to become unresponsive.

I'd suggest to distribute the queries to more Flink JMs / clusters. If you run this on a resource manager such as YARN, it is fairly easy to start a new cluster.

Best, Fabian

> Performance issue with Flink SQL
> --------------------------------
>
>                 Key: FLINK-9166
>                 URL: https://issues.apache.org/jira/browse/FLINK-9166
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API &amp; SQL
>    Affects Versions: 1.4.2
>            Reporter: SUBRAMANYA SURESH
>            Priority: Major
>              Labels: flink, graph, performance, sql, yarn
>
> With a high number of Flink SQL queries (100 of below), the Flink command line client fails with a "JobManager did not respond within 600000 ms" on a Yarn cluster.
>  * JobManager logs has nothing after the last TaskManager started except DEBUG logs with "job with ID 5cd95f89ed7a66ec44f2d19eca0592f7 not found in JobManager", indicating its likely stuck (creating the ExecutionGraph?).
>  * The same works as standalone java program locally (high CPU initially)
>  * Note: Each Row in structStream contains 515 columns (many end up null) including a column that has the raw message.
>  * In the YARN cluster we specify 18GB for TaskManager, 18GB for the JobManager, 145 TaskManagers with 5 slots each and parallelism of 725 (partitions in our Kafka source).
> *Query:*
> {code:java}
>  select count (*), 'idnumber' as criteria, Environment, CollectedTimestamp, EventTimestamp, RawMsg, Source 
>  from structStream
>  where Environment='MyEnvironment' and Rule='MyRule' and LogType='MyLogType' and Outcome='Success'
>  group by tumble(proctime, INTERVAL '1' SECOND), Environment, CollectedTimestamp, EventTimestamp, RawMsg, Source
> {code}
> *Code:*
> {code:java}
> public static void main(String[] args) throws Exception {
>  FileSystems.newFileSystem(KafkaReadingStreamingJob.class.getResource(WHITELIST_CSV).toURI(), new HashMap<>());
>  final StreamExecutionEnvironment streamingEnvironment = getStreamExecutionEnvironment();
>  final StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(streamingEnvironment);
>  final DataStream<Row> structStream = getKafkaStreamOfRows(streamingEnvironment);
>  tableEnv.registerDataStream("structStream", structStream);
>  tableEnv.scan("structStream").printSchema();
>  for (int i = 0; i < 100; i++){
>    for (String query : Queries.sample){
>      // Queries.sample has one query that is above. 
>      Table selectQuery = tableEnv.sqlQuery(query);
>      DataStream<Row> selectQueryStream = tableEnv.toAppendStream(selectQuery,  Row.class);
>      selectQueryStream.print();
>    }
>  }
>  // execute program
>  streamingEnvironment.execute("Kafka Streaming SQL");
> }
> private static DataStream<Row> getKafkaStreamOfRows(StreamExecutionEnvironment environment) throws Exception {
>   Properties properties = getKafkaProperties();
>   // TestDeserializer deserializes the JSON to a ROW of string columns (515)
>   // and also adds a column for the raw message. 
>   FlinkKafkaConsumer011 consumer = new FlinkKafkaConsumer011(KAFKA_TOPIC_TO_CONSUME, new            TestDeserializer(getRowTypeInfo()), properties);
>   DataStream<Row> stream = environment.addSource(consumer);
>   return stream;
> }
> private static RowTypeInfo getRowTypeInfo() throws Exception {
>   // This has 515 fields. 
>   List<String> fieldNames = DDIManager.getDDIFieldNames();
>   fieldNames.add("rawkafka"); // rawMessage added by TestDeserializer
>   fieldNames.add("proctime");
>  // Fill typeInformationArray with StringType to all but the last field which is of type Time
>   .....
>   return new RowTypeInfo(typeInformationArray, fieldNamesArray);
> }
> private static StreamExecutionEnvironment getStreamExecutionEnvironment() throws IOException {
>   final StreamExecutionEnvironment env =      StreamExecutionEnvironment.getExecutionEnvironment(); 
>    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
>    env.enableCheckpointing(60000);
>    env.setStateBackend(new FsStateBackend(CHECKPOINT_DIR));
>    env.setParallelism(725);
>    return env;
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)