You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "SUBRAMANYA SURESH (JIRA)" <ji...@apache.org> on 2018/05/21 19:18:00 UTC

[jira] [Commented] (FLINK-9166) Performance issue with Flink SQL

    [ https://issues.apache.org/jira/browse/FLINK-9166?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16482929#comment-16482929 ] 

SUBRAMANYA SURESH commented on FLINK-9166:
------------------------------------------

Thanks this indeed solved the problem. We were able to go upto 300 queries with this. 

We noticed, the throughput seems to degrade with the amount of queries. We were able to get to 100 Queries with about 1.3 Million EPS with this in one Flink cluster. With 150 Queries, performance goes to around 725K. We could potentially increase this with more partitions and hence compute, but seems like the limiting factor here is all the operators are part of one box in the execution graph i.e. we have a huge source->(map1-where1-toRow1-Sink, map2-where2-toRow2-Sink.....), so I assume all of the () are running in the same thread in the Slot one after the other. I tried disabling operator chaining to see if I could get more parallelism, but that generated a bigger graph and caused worser performance. Note: We have not run a java profiler. 
 
Note: With 200 queries, the Job gets stuck at submission like earlier with 2GB Akka.framesize. Increasing it to 2.5 or 3GB gives us an error stating that the YarnClientDescriptor: Deployment took more than 60 seconds. See if the yarn resources are available. 


Two approaches were suggested by [~rmetzger]
"(a) split the queries into Flink jobs of, say 10 queries each
(b) use slot-sharing groups. 
The slot sharing groups are inherited to subsequent operators. So if you set them before the table api / sql query is defined, you should get the expected result.
If you are unable to set the slot sharing group, you could maybe add a no-op MapOperator before the query, so that you can set the group."
a) Will could create an operational overhead for us, b) is appealing, we will have to test and get the right balance to the number of Queries per group.
 
Will update as I have more information. 

> Performance issue with Flink SQL
> --------------------------------
>
>                 Key: FLINK-9166
>                 URL: https://issues.apache.org/jira/browse/FLINK-9166
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API &amp; SQL
>    Affects Versions: 1.4.2
>            Reporter: SUBRAMANYA SURESH
>            Priority: Major
>              Labels: flink, graph, performance, sql, yarn
>
> With a high number of Flink SQL queries (100 of below), the Flink command line client fails with a "JobManager did not respond within 600000 ms" on a Yarn cluster.
>  * JobManager logs has nothing after the last TaskManager started except DEBUG logs with "job with ID 5cd95f89ed7a66ec44f2d19eca0592f7 not found in JobManager", indicating its likely stuck (creating the ExecutionGraph?).
>  * The same works as standalone java program locally (high CPU initially)
>  * Note: Each Row in structStream contains 515 columns (many end up null) including a column that has the raw message.
>  * In the YARN cluster we specify 18GB for TaskManager, 18GB for the JobManager, 145 TaskManagers with 5 slots each and parallelism of 725 (partitions in our Kafka source).
> *Query:*
> {code:java}
>  select count (*), 'idnumber' as criteria, Environment, CollectedTimestamp, EventTimestamp, RawMsg, Source 
>  from structStream
>  where Environment='MyEnvironment' and Rule='MyRule' and LogType='MyLogType' and Outcome='Success'
>  group by tumble(proctime, INTERVAL '1' SECOND), Environment, CollectedTimestamp, EventTimestamp, RawMsg, Source
> {code}
> *Code:*
> {code:java}
> public static void main(String[] args) throws Exception {
>  FileSystems.newFileSystem(KafkaReadingStreamingJob.class.getResource(WHITELIST_CSV).toURI(), new HashMap<>());
>  final StreamExecutionEnvironment streamingEnvironment = getStreamExecutionEnvironment();
>  final StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(streamingEnvironment);
>  final DataStream<Row> structStream = getKafkaStreamOfRows(streamingEnvironment);
>  tableEnv.registerDataStream("structStream", structStream);
>  tableEnv.scan("structStream").printSchema();
>  for (int i = 0; i < 100; i++){
>    for (String query : Queries.sample){
>      // Queries.sample has one query that is above. 
>      Table selectQuery = tableEnv.sqlQuery(query);
>      DataStream<Row> selectQueryStream = tableEnv.toAppendStream(selectQuery,  Row.class);
>      selectQueryStream.print();
>    }
>  }
>  // execute program
>  streamingEnvironment.execute("Kafka Streaming SQL");
> }
> private static DataStream<Row> getKafkaStreamOfRows(StreamExecutionEnvironment environment) throws Exception {
>   Properties properties = getKafkaProperties();
>   // TestDeserializer deserializes the JSON to a ROW of string columns (515)
>   // and also adds a column for the raw message. 
>   FlinkKafkaConsumer011 consumer = new FlinkKafkaConsumer011(KAFKA_TOPIC_TO_CONSUME, new            TestDeserializer(getRowTypeInfo()), properties);
>   DataStream<Row> stream = environment.addSource(consumer);
>   return stream;
> }
> private static RowTypeInfo getRowTypeInfo() throws Exception {
>   // This has 515 fields. 
>   List<String> fieldNames = DDIManager.getDDIFieldNames();
>   fieldNames.add("rawkafka"); // rawMessage added by TestDeserializer
>   fieldNames.add("proctime");
>  // Fill typeInformationArray with StringType to all but the last field which is of type Time
>   .....
>   return new RowTypeInfo(typeInformationArray, fieldNamesArray);
> }
> private static StreamExecutionEnvironment getStreamExecutionEnvironment() throws IOException {
>   final StreamExecutionEnvironment env =      StreamExecutionEnvironment.getExecutionEnvironment(); 
>    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
>    env.enableCheckpointing(60000);
>    env.setStateBackend(new FsStateBackend(CHECKPOINT_DIR));
>    env.setParallelism(725);
>    return env;
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)