You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by jie mei <me...@gmail.com> on 2021/03/24 12:33:20 UTC

flink sql jmh failure

Hi, Community

I run a jmh benchmark task get blew error, which use flink sql consuming
data from data-gen connector(10_000_000) and write data to clickhouse. blew
is partly log and you can see completable log by attached file

*My jmh benchmark code as blew:*

@Benchmark
@Threads(1)
@Fork(1)
public void sinkBenchmark() throws IOException {

  StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment
      .getExecutionEnvironment();
  streamEnv.enableCheckpointing(60000);

  EnvironmentSettings settings = EnvironmentSettings.newInstance()
      .useBlinkPlanner()
      .inStreamingMode().build();
  TableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv,
settings);

  // create clickhouse table
  new ClickHouseTableBuilder(tableEnv,
      parseSchema("clickhouse_sink_table.sql"))
      .database("benchmark")
      .table("bilophus_sink_benchmark")
      .address("jdbc:clickhouse://localhost:8123")
      .build();

  // create mock data table
  tableEnv.executeSql(
      parseSchema("clickhouse_source_table.sql") +
          "WITH (" +
          "'connector' = 'datagen'," +
          "'number-of-rows' = '10000000')");

  tableEnv.executeSql(
      "INSERT INTO CLICKHOUSE_SINK_BENCHMARK SELECT '2020-12-12', *
FROM CLICKHOUSE_SOURCE_BENCHMARK");

}

*running command:*

mvn clean package -DskipTests

<plugin>
  <groupId>org.codehaus.mojo</groupId>
  <artifactId>exec-maven-plugin</artifactId>
  <version>1.6.0</version>
  <executions>
    <execution>
      <id>test-benchmarks</id>
      <phase>test</phase>
      <goals>
        <goal>exec</goal>
      </goals>
    </execution>
  </executions>
  <configuration>
    <skip>false</skip>
    <classpathScope>test</classpathScope>
    <executable>java</executable>
    <arguments>
      <argument>-Xmx6g</argument>
      <argument>-classpath</argument>
      <classpath/>
      <argument>org.openjdk.jmh.Main</argument>
      <!--shouldFailOnError-->
      <argument>-foe</argument>
      <argument>true</argument>
      <!--speed up tests-->
      <argument>-f</argument>
      <argument>1</argument>
      <argument>-i</argument>
      <argument>1</argument>
      <argument>-wi</argument>
      <argument>0</argument>
      <argument>-rf</argument>
      <argument>csv</argument>
      <argument>.*</argument>
    </arguments>
  </configuration>
</plugin>


Non-finished threads:

Thread[Source: TableSourceScan(table=[[default_catalog, default_database,
CLICKHOUSE_SOURCE_BENCHMARK]], fields=[first_bigint, second_bigint,
first_int, second_int, first_float, second_float, first_double,
second_double, first_string, s
econd_string]) -> Calc(select=[_UTF-16LE'2020-12-12' AS EXPR$0,
first_bigint, second_bigint, first_int, second_int, first_float,
second_float, first_double, second_double, first_string, second_string]) ->
Sink: Sink(table=[default_catal
og.default_database.CLICKHOUSE_SINK_BENCHMARK], fields=[dt, first_bigint,
second_bigint, first_int, second_int, first_float, second_float,
first_double, second_double, first_string, second_string]) (1/6),5,Flink
Task Threads]
 at sun.misc.Unsafe.park(Native Method)
 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
 at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)

 at
org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:146)

 at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:298)

 at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:183)

 at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:569)

 at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:534)

 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
 at java.lang.Thread.run(Thread.java:748)

Thread[flink-akka.actor.default-dispatcher-8,5,main]
 at sun.misc.Unsafe.park(Native Method)
 at akka.dispatch.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
 at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


Thread[flink-akka.actor.default-dispatcher-2,5,main]
 at sun.misc.Unsafe.park(Native Method)
 at akka.dispatch.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
 at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


Thread[Source: TableSourceScan(table=[[default_catalog, default_database,
CLICKHOUSE_SOURCE_BENCHMARK]], fields=[first_bigint, second_bigint,
first_int, second_int, first_float, second_float, first_double,
second_double, first_string, s
econd_string]) -> Calc(select=[_UTF-16LE'2020-12-12' AS EXPR$0,
first_bigint, second_bigint, first_int, second_int, first_float,
second_float, first_double, second_double, first_string, second_string]) ->
Sink: Sink(table=[default_catal
og.default_database.CLICKHOUSE_SINK_BENCHMARK], fields=[dt, first_bigint,
second_bigint, first_int, second_int, first_float, second_float,
first_double, second_double, first_string, second_string]) (4/6),5,Flink
Task Threads]
 at sun.misc.Unsafe.park(Native Method)
 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
 at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)

 at
org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:146)

 at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:298)

 at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:183)

 at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:569)

 at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:534)

 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
 at java.lang.Thread.run(Thread.java:748)


-- 

*Best Regards*
*Jeremy Mei*

Re: flink sql jmh failure

Posted by jie mei <me...@gmail.com>.
HI, Guowei

yeah, I think so too. There is no way trigger a checkpoint and wath the
checkpoint finished now, so I will do the benchmark with lower level api.


Guowei Ma <gu...@gmail.com> 于2021年3月25日周四 下午4:59写道:

> Hi,
> I am not an expert of JMH but it seems that it is not an error. From the
> log it looks like that the job is not finished.
> The data source continues to read data when JMH finishes.
>
> Thread[Legacy Source Thread - Source:
> TableSourceScan(table=[[default_catalog, default_database,
> CLICKHOUSE_SOURCE_BENCHMARK]], fields=[first_bigint, second_bigint,
> first_int, second_int, first_float, second_float, first_double,
> second_double, first_string, second_string]) ->
> Calc(select=[_UTF-16LE'2020-12-12' AS EXPR$0, first_bigint, second_bigint,
> first_int, second_int, first_float, second_float, first_double,
> second_double, first_string, second_string]) -> Sink:
> Sink(table=[default_catalog.default_database.CLICKHOUSE_SINK_BENCHMARK],
> fields=[dt, first_bigint, second_bigint, first_int, second_int,
> first_float, second_float, first_double, second_double, first_string,
> second_string]) (3/6),5,Flink Task Threads]
>   at
> org.apache.flink.table.data.binary.BinaryStringData.fromString(BinaryStringData.java:82)
>   at org.apache.flink.table.data.StringData.fromString(StringData.java:52)
>   at
> org.apache.flink.table.factories.DataGenTableSourceFactory$1.next(DataGenTableSourceFactory.java:171)
>   at
> org.apache.flink.table.factories.DataGenTableSourceFactory$1.next(DataGenTableSourceFactory.java:168)
>   at
> org.apache.flink.table.factories.DataGenTableSourceFactory$RowGenerator.next(DataGenTableSourceFactory.java:320)
>   at
> org.apache.flink.table.factories.DataGenTableSourceFactory$RowGenerator.next(DataGenTableSourceFactory.java:277)
>   at
> org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource.run(DataGeneratorSource.java:82)
>   at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>   at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>   at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)
>
> Best,
> Guowei
>
>
> On Wed, Mar 24, 2021 at 9:56 PM jie mei <me...@gmail.com> wrote:
>
>> Hi, Yik San
>>
>> I use a library wroten by myself and trying to verify the performance.
>>
>>
>> Yik San Chan <ev...@gmail.com> 于2021年3月24日周三 下午9:07写道:
>>
>>> Hi Jie,
>>>
>>> I am curious what library do you use to get the ClickHouseTableBuilder
>>>
>>> On Wed, Mar 24, 2021 at 8:41 PM jie mei <me...@gmail.com> wrote:
>>>
>>>> Hi, Community
>>>>
>>>> I run a jmh benchmark task get blew error, which use flink sql
>>>> consuming data from data-gen connector(10_000_000) and write data to
>>>> clickhouse. blew is partly log and you can see completable log by attached
>>>> file
>>>>
>>>> *My jmh benchmark code as blew:*
>>>>
>>>> @Benchmark
>>>> @Threads(1)
>>>> @Fork(1)
>>>> public void sinkBenchmark() throws IOException {
>>>>
>>>>   StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment
>>>>       .getExecutionEnvironment();
>>>>   streamEnv.enableCheckpointing(60000);
>>>>
>>>>   EnvironmentSettings settings = EnvironmentSettings.newInstance()
>>>>       .useBlinkPlanner()
>>>>       .inStreamingMode().build();
>>>>   TableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings);
>>>>
>>>>   // create clickhouse table
>>>>   new ClickHouseTableBuilder(tableEnv,
>>>>       parseSchema("clickhouse_sink_table.sql"))
>>>>       .database("benchmark")
>>>>       .table("bilophus_sink_benchmark")
>>>>       .address("jdbc:clickhouse://localhost:8123")
>>>>       .build();
>>>>
>>>>   // create mock data table
>>>>   tableEnv.executeSql(
>>>>       parseSchema("clickhouse_source_table.sql") +
>>>>           "WITH (" +
>>>>           "'connector' = 'datagen'," +
>>>>           "'number-of-rows' = '10000000')");
>>>>
>>>>   tableEnv.executeSql(
>>>>       "INSERT INTO CLICKHOUSE_SINK_BENCHMARK SELECT '2020-12-12', * FROM CLICKHOUSE_SOURCE_BENCHMARK");
>>>>
>>>> }
>>>>
>>>> *running command:*
>>>>
>>>> mvn clean package -DskipTests
>>>>
>>>> <plugin>
>>>>   <groupId>org.codehaus.mojo</groupId>
>>>>   <artifactId>exec-maven-plugin</artifactId>
>>>>   <version>1.6.0</version>
>>>>   <executions>
>>>>     <execution>
>>>>       <id>test-benchmarks</id>
>>>>       <phase>test</phase>
>>>>       <goals>
>>>>         <goal>exec</goal>
>>>>       </goals>
>>>>     </execution>
>>>>   </executions>
>>>>   <configuration>
>>>>     <skip>false</skip>
>>>>     <classpathScope>test</classpathScope>
>>>>     <executable>java</executable>
>>>>     <arguments>
>>>>       <argument>-Xmx6g</argument>
>>>>       <argument>-classpath</argument>
>>>>       <classpath/>
>>>>       <argument>org.openjdk.jmh.Main</argument>
>>>>       <!--shouldFailOnError-->
>>>>       <argument>-foe</argument>
>>>>       <argument>true</argument>
>>>>       <!--speed up tests-->
>>>>       <argument>-f</argument>
>>>>       <argument>1</argument>
>>>>       <argument>-i</argument>
>>>>       <argument>1</argument>
>>>>       <argument>-wi</argument>
>>>>       <argument>0</argument>
>>>>       <argument>-rf</argument>
>>>>       <argument>csv</argument>
>>>>       <argument>.*</argument>
>>>>     </arguments>
>>>>   </configuration>
>>>> </plugin>
>>>>
>>>>
>>>> Non-finished threads:
>>>>
>>>> Thread[Source: TableSourceScan(table=[[default_catalog,
>>>> default_database, CLICKHOUSE_SOURCE_BENCHMARK]], fields=[first_bigint,
>>>> second_bigint, first_int, second_int, first_float, second_float,
>>>> first_double, second_double, first_string, s
>>>> econd_string]) -> Calc(select=[_UTF-16LE'2020-12-12' AS EXPR$0,
>>>> first_bigint, second_bigint, first_int, second_int, first_float,
>>>> second_float, first_double, second_double, first_string, second_string]) ->
>>>> Sink: Sink(table=[default_catal
>>>> og.default_database.CLICKHOUSE_SINK_BENCHMARK], fields=[dt,
>>>> first_bigint, second_bigint, first_int, second_int, first_float,
>>>> second_float, first_double, second_double, first_string, second_string])
>>>> (1/6),5,Flink Task Threads]
>>>>  at sun.misc.Unsafe.park(Native Method)
>>>>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>>>>  at
>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
>>>>
>>>>  at
>>>> org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:146)
>>>>
>>>>  at
>>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:298)
>>>>
>>>>  at
>>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:183)
>>>>
>>>>  at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:569)
>>>>
>>>>  at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:534)
>>>>
>>>>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>>>>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>>>>  at java.lang.Thread.run(Thread.java:748)
>>>>
>>>> Thread[flink-akka.actor.default-dispatcher-8,5,main]
>>>>  at sun.misc.Unsafe.park(Native Method)
>>>>  at akka.dispatch.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
>>>>  at
>>>> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>  at
>>>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>
>>>>
>>>> Thread[flink-akka.actor.default-dispatcher-2,5,main]
>>>>  at sun.misc.Unsafe.park(Native Method)
>>>>  at akka.dispatch.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
>>>>  at
>>>> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>  at
>>>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>
>>>>
>>>> Thread[Source: TableSourceScan(table=[[default_catalog,
>>>> default_database, CLICKHOUSE_SOURCE_BENCHMARK]], fields=[first_bigint,
>>>> second_bigint, first_int, second_int, first_float, second_float,
>>>> first_double, second_double, first_string, s
>>>> econd_string]) -> Calc(select=[_UTF-16LE'2020-12-12' AS EXPR$0,
>>>> first_bigint, second_bigint, first_int, second_int, first_float,
>>>> second_float, first_double, second_double, first_string, second_string]) ->
>>>> Sink: Sink(table=[default_catal
>>>> og.default_database.CLICKHOUSE_SINK_BENCHMARK], fields=[dt,
>>>> first_bigint, second_bigint, first_int, second_int, first_float,
>>>> second_float, first_double, second_double, first_string, second_string])
>>>> (4/6),5,Flink Task Threads]
>>>>  at sun.misc.Unsafe.park(Native Method)
>>>>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>>>>  at
>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
>>>>
>>>>  at
>>>> org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:146)
>>>>
>>>>  at
>>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:298)
>>>>
>>>>  at
>>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:183)
>>>>
>>>>  at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:569)
>>>>
>>>>  at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:534)
>>>>
>>>>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>>>>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>>>>  at java.lang.Thread.run(Thread.java:748)
>>>>
>>>>
>>>> --
>>>>
>>>> *Best Regards*
>>>> *Jeremy Mei*
>>>>
>>>
>>
>> --
>>
>> *Best Regards*
>> *Jeremy Mei*
>>
>

-- 

*Best Regards*
*Jeremy Mei*

Re: flink sql jmh failure

Posted by Guowei Ma <gu...@gmail.com>.
Hi,
I am not an expert of JMH but it seems that it is not an error. From the
log it looks like that the job is not finished.
The data source continues to read data when JMH finishes.

Thread[Legacy Source Thread - Source:
TableSourceScan(table=[[default_catalog, default_database,
CLICKHOUSE_SOURCE_BENCHMARK]], fields=[first_bigint, second_bigint,
first_int, second_int, first_float, second_float, first_double,
second_double, first_string, second_string]) ->
Calc(select=[_UTF-16LE'2020-12-12' AS EXPR$0, first_bigint, second_bigint,
first_int, second_int, first_float, second_float, first_double,
second_double, first_string, second_string]) -> Sink:
Sink(table=[default_catalog.default_database.CLICKHOUSE_SINK_BENCHMARK],
fields=[dt, first_bigint, second_bigint, first_int, second_int,
first_float, second_float, first_double, second_double, first_string,
second_string]) (3/6),5,Flink Task Threads]
  at
org.apache.flink.table.data.binary.BinaryStringData.fromString(BinaryStringData.java:82)
  at org.apache.flink.table.data.StringData.fromString(StringData.java:52)
  at
org.apache.flink.table.factories.DataGenTableSourceFactory$1.next(DataGenTableSourceFactory.java:171)
  at
org.apache.flink.table.factories.DataGenTableSourceFactory$1.next(DataGenTableSourceFactory.java:168)
  at
org.apache.flink.table.factories.DataGenTableSourceFactory$RowGenerator.next(DataGenTableSourceFactory.java:320)
  at
org.apache.flink.table.factories.DataGenTableSourceFactory$RowGenerator.next(DataGenTableSourceFactory.java:277)
  at
org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource.run(DataGeneratorSource.java:82)
  at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
  at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
  at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)

Best,
Guowei


On Wed, Mar 24, 2021 at 9:56 PM jie mei <me...@gmail.com> wrote:

> Hi, Yik San
>
> I use a library wroten by myself and trying to verify the performance.
>
>
> Yik San Chan <ev...@gmail.com> 于2021年3月24日周三 下午9:07写道:
>
>> Hi Jie,
>>
>> I am curious what library do you use to get the ClickHouseTableBuilder
>>
>> On Wed, Mar 24, 2021 at 8:41 PM jie mei <me...@gmail.com> wrote:
>>
>>> Hi, Community
>>>
>>> I run a jmh benchmark task get blew error, which use flink sql consuming
>>> data from data-gen connector(10_000_000) and write data to clickhouse. blew
>>> is partly log and you can see completable log by attached file
>>>
>>> *My jmh benchmark code as blew:*
>>>
>>> @Benchmark
>>> @Threads(1)
>>> @Fork(1)
>>> public void sinkBenchmark() throws IOException {
>>>
>>>   StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment
>>>       .getExecutionEnvironment();
>>>   streamEnv.enableCheckpointing(60000);
>>>
>>>   EnvironmentSettings settings = EnvironmentSettings.newInstance()
>>>       .useBlinkPlanner()
>>>       .inStreamingMode().build();
>>>   TableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings);
>>>
>>>   // create clickhouse table
>>>   new ClickHouseTableBuilder(tableEnv,
>>>       parseSchema("clickhouse_sink_table.sql"))
>>>       .database("benchmark")
>>>       .table("bilophus_sink_benchmark")
>>>       .address("jdbc:clickhouse://localhost:8123")
>>>       .build();
>>>
>>>   // create mock data table
>>>   tableEnv.executeSql(
>>>       parseSchema("clickhouse_source_table.sql") +
>>>           "WITH (" +
>>>           "'connector' = 'datagen'," +
>>>           "'number-of-rows' = '10000000')");
>>>
>>>   tableEnv.executeSql(
>>>       "INSERT INTO CLICKHOUSE_SINK_BENCHMARK SELECT '2020-12-12', * FROM CLICKHOUSE_SOURCE_BENCHMARK");
>>>
>>> }
>>>
>>> *running command:*
>>>
>>> mvn clean package -DskipTests
>>>
>>> <plugin>
>>>   <groupId>org.codehaus.mojo</groupId>
>>>   <artifactId>exec-maven-plugin</artifactId>
>>>   <version>1.6.0</version>
>>>   <executions>
>>>     <execution>
>>>       <id>test-benchmarks</id>
>>>       <phase>test</phase>
>>>       <goals>
>>>         <goal>exec</goal>
>>>       </goals>
>>>     </execution>
>>>   </executions>
>>>   <configuration>
>>>     <skip>false</skip>
>>>     <classpathScope>test</classpathScope>
>>>     <executable>java</executable>
>>>     <arguments>
>>>       <argument>-Xmx6g</argument>
>>>       <argument>-classpath</argument>
>>>       <classpath/>
>>>       <argument>org.openjdk.jmh.Main</argument>
>>>       <!--shouldFailOnError-->
>>>       <argument>-foe</argument>
>>>       <argument>true</argument>
>>>       <!--speed up tests-->
>>>       <argument>-f</argument>
>>>       <argument>1</argument>
>>>       <argument>-i</argument>
>>>       <argument>1</argument>
>>>       <argument>-wi</argument>
>>>       <argument>0</argument>
>>>       <argument>-rf</argument>
>>>       <argument>csv</argument>
>>>       <argument>.*</argument>
>>>     </arguments>
>>>   </configuration>
>>> </plugin>
>>>
>>>
>>> Non-finished threads:
>>>
>>> Thread[Source: TableSourceScan(table=[[default_catalog,
>>> default_database, CLICKHOUSE_SOURCE_BENCHMARK]], fields=[first_bigint,
>>> second_bigint, first_int, second_int, first_float, second_float,
>>> first_double, second_double, first_string, s
>>> econd_string]) -> Calc(select=[_UTF-16LE'2020-12-12' AS EXPR$0,
>>> first_bigint, second_bigint, first_int, second_int, first_float,
>>> second_float, first_double, second_double, first_string, second_string]) ->
>>> Sink: Sink(table=[default_catal
>>> og.default_database.CLICKHOUSE_SINK_BENCHMARK], fields=[dt,
>>> first_bigint, second_bigint, first_int, second_int, first_float,
>>> second_float, first_double, second_double, first_string, second_string])
>>> (1/6),5,Flink Task Threads]
>>>  at sun.misc.Unsafe.park(Native Method)
>>>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>>>  at
>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
>>>
>>>  at
>>> org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:146)
>>>
>>>  at
>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:298)
>>>
>>>  at
>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:183)
>>>
>>>  at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:569)
>>>
>>>  at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:534)
>>>
>>>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>>>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>>>  at java.lang.Thread.run(Thread.java:748)
>>>
>>> Thread[flink-akka.actor.default-dispatcher-8,5,main]
>>>  at sun.misc.Unsafe.park(Native Method)
>>>  at akka.dispatch.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
>>>  at
>>> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>  at
>>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>
>>>
>>> Thread[flink-akka.actor.default-dispatcher-2,5,main]
>>>  at sun.misc.Unsafe.park(Native Method)
>>>  at akka.dispatch.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
>>>  at
>>> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>  at
>>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>
>>>
>>> Thread[Source: TableSourceScan(table=[[default_catalog,
>>> default_database, CLICKHOUSE_SOURCE_BENCHMARK]], fields=[first_bigint,
>>> second_bigint, first_int, second_int, first_float, second_float,
>>> first_double, second_double, first_string, s
>>> econd_string]) -> Calc(select=[_UTF-16LE'2020-12-12' AS EXPR$0,
>>> first_bigint, second_bigint, first_int, second_int, first_float,
>>> second_float, first_double, second_double, first_string, second_string]) ->
>>> Sink: Sink(table=[default_catal
>>> og.default_database.CLICKHOUSE_SINK_BENCHMARK], fields=[dt,
>>> first_bigint, second_bigint, first_int, second_int, first_float,
>>> second_float, first_double, second_double, first_string, second_string])
>>> (4/6),5,Flink Task Threads]
>>>  at sun.misc.Unsafe.park(Native Method)
>>>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>>>  at
>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
>>>
>>>  at
>>> org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:146)
>>>
>>>  at
>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:298)
>>>
>>>  at
>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:183)
>>>
>>>  at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:569)
>>>
>>>  at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:534)
>>>
>>>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>>>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>>>  at java.lang.Thread.run(Thread.java:748)
>>>
>>>
>>> --
>>>
>>> *Best Regards*
>>> *Jeremy Mei*
>>>
>>
>
> --
>
> *Best Regards*
> *Jeremy Mei*
>

Re: flink sql jmh failure

Posted by jie mei <me...@gmail.com>.
Hi, Yik San

I use a library wroten by myself and trying to verify the performance.


Yik San Chan <ev...@gmail.com> 于2021年3月24日周三 下午9:07写道:

> Hi Jie,
>
> I am curious what library do you use to get the ClickHouseTableBuilder
>
> On Wed, Mar 24, 2021 at 8:41 PM jie mei <me...@gmail.com> wrote:
>
>> Hi, Community
>>
>> I run a jmh benchmark task get blew error, which use flink sql consuming
>> data from data-gen connector(10_000_000) and write data to clickhouse. blew
>> is partly log and you can see completable log by attached file
>>
>> *My jmh benchmark code as blew:*
>>
>> @Benchmark
>> @Threads(1)
>> @Fork(1)
>> public void sinkBenchmark() throws IOException {
>>
>>   StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment
>>       .getExecutionEnvironment();
>>   streamEnv.enableCheckpointing(60000);
>>
>>   EnvironmentSettings settings = EnvironmentSettings.newInstance()
>>       .useBlinkPlanner()
>>       .inStreamingMode().build();
>>   TableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings);
>>
>>   // create clickhouse table
>>   new ClickHouseTableBuilder(tableEnv,
>>       parseSchema("clickhouse_sink_table.sql"))
>>       .database("benchmark")
>>       .table("bilophus_sink_benchmark")
>>       .address("jdbc:clickhouse://localhost:8123")
>>       .build();
>>
>>   // create mock data table
>>   tableEnv.executeSql(
>>       parseSchema("clickhouse_source_table.sql") +
>>           "WITH (" +
>>           "'connector' = 'datagen'," +
>>           "'number-of-rows' = '10000000')");
>>
>>   tableEnv.executeSql(
>>       "INSERT INTO CLICKHOUSE_SINK_BENCHMARK SELECT '2020-12-12', * FROM CLICKHOUSE_SOURCE_BENCHMARK");
>>
>> }
>>
>> *running command:*
>>
>> mvn clean package -DskipTests
>>
>> <plugin>
>>   <groupId>org.codehaus.mojo</groupId>
>>   <artifactId>exec-maven-plugin</artifactId>
>>   <version>1.6.0</version>
>>   <executions>
>>     <execution>
>>       <id>test-benchmarks</id>
>>       <phase>test</phase>
>>       <goals>
>>         <goal>exec</goal>
>>       </goals>
>>     </execution>
>>   </executions>
>>   <configuration>
>>     <skip>false</skip>
>>     <classpathScope>test</classpathScope>
>>     <executable>java</executable>
>>     <arguments>
>>       <argument>-Xmx6g</argument>
>>       <argument>-classpath</argument>
>>       <classpath/>
>>       <argument>org.openjdk.jmh.Main</argument>
>>       <!--shouldFailOnError-->
>>       <argument>-foe</argument>
>>       <argument>true</argument>
>>       <!--speed up tests-->
>>       <argument>-f</argument>
>>       <argument>1</argument>
>>       <argument>-i</argument>
>>       <argument>1</argument>
>>       <argument>-wi</argument>
>>       <argument>0</argument>
>>       <argument>-rf</argument>
>>       <argument>csv</argument>
>>       <argument>.*</argument>
>>     </arguments>
>>   </configuration>
>> </plugin>
>>
>>
>> Non-finished threads:
>>
>> Thread[Source: TableSourceScan(table=[[default_catalog, default_database,
>> CLICKHOUSE_SOURCE_BENCHMARK]], fields=[first_bigint, second_bigint,
>> first_int, second_int, first_float, second_float, first_double,
>> second_double, first_string, s
>> econd_string]) -> Calc(select=[_UTF-16LE'2020-12-12' AS EXPR$0,
>> first_bigint, second_bigint, first_int, second_int, first_float,
>> second_float, first_double, second_double, first_string, second_string]) ->
>> Sink: Sink(table=[default_catal
>> og.default_database.CLICKHOUSE_SINK_BENCHMARK], fields=[dt, first_bigint,
>> second_bigint, first_int, second_int, first_float, second_float,
>> first_double, second_double, first_string, second_string]) (1/6),5,Flink
>> Task Threads]
>>  at sun.misc.Unsafe.park(Native Method)
>>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>>  at
>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
>>
>>  at
>> org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:146)
>>
>>  at
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:298)
>>
>>  at
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:183)
>>
>>  at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:569)
>>
>>  at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:534)
>>
>>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>>  at java.lang.Thread.run(Thread.java:748)
>>
>> Thread[flink-akka.actor.default-dispatcher-8,5,main]
>>  at sun.misc.Unsafe.park(Native Method)
>>  at akka.dispatch.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
>>  at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>  at
>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>>
>> Thread[flink-akka.actor.default-dispatcher-2,5,main]
>>  at sun.misc.Unsafe.park(Native Method)
>>  at akka.dispatch.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
>>  at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>  at
>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>>
>> Thread[Source: TableSourceScan(table=[[default_catalog, default_database,
>> CLICKHOUSE_SOURCE_BENCHMARK]], fields=[first_bigint, second_bigint,
>> first_int, second_int, first_float, second_float, first_double,
>> second_double, first_string, s
>> econd_string]) -> Calc(select=[_UTF-16LE'2020-12-12' AS EXPR$0,
>> first_bigint, second_bigint, first_int, second_int, first_float,
>> second_float, first_double, second_double, first_string, second_string]) ->
>> Sink: Sink(table=[default_catal
>> og.default_database.CLICKHOUSE_SINK_BENCHMARK], fields=[dt, first_bigint,
>> second_bigint, first_int, second_int, first_float, second_float,
>> first_double, second_double, first_string, second_string]) (4/6),5,Flink
>> Task Threads]
>>  at sun.misc.Unsafe.park(Native Method)
>>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>>  at
>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
>>
>>  at
>> org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:146)
>>
>>  at
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:298)
>>
>>  at
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:183)
>>
>>  at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:569)
>>
>>  at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:534)
>>
>>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>>  at java.lang.Thread.run(Thread.java:748)
>>
>>
>> --
>>
>> *Best Regards*
>> *Jeremy Mei*
>>
>

-- 

*Best Regards*
*Jeremy Mei*

Re: flink sql jmh failure

Posted by Yik San Chan <ev...@gmail.com>.
Hi Jie,

I am curious what library do you use to get the ClickHouseTableBuilder

On Wed, Mar 24, 2021 at 8:41 PM jie mei <me...@gmail.com> wrote:

> Hi, Community
>
> I run a jmh benchmark task get blew error, which use flink sql consuming
> data from data-gen connector(10_000_000) and write data to clickhouse. blew
> is partly log and you can see completable log by attached file
>
> *My jmh benchmark code as blew:*
>
> @Benchmark
> @Threads(1)
> @Fork(1)
> public void sinkBenchmark() throws IOException {
>
>   StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment
>       .getExecutionEnvironment();
>   streamEnv.enableCheckpointing(60000);
>
>   EnvironmentSettings settings = EnvironmentSettings.newInstance()
>       .useBlinkPlanner()
>       .inStreamingMode().build();
>   TableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings);
>
>   // create clickhouse table
>   new ClickHouseTableBuilder(tableEnv,
>       parseSchema("clickhouse_sink_table.sql"))
>       .database("benchmark")
>       .table("bilophus_sink_benchmark")
>       .address("jdbc:clickhouse://localhost:8123")
>       .build();
>
>   // create mock data table
>   tableEnv.executeSql(
>       parseSchema("clickhouse_source_table.sql") +
>           "WITH (" +
>           "'connector' = 'datagen'," +
>           "'number-of-rows' = '10000000')");
>
>   tableEnv.executeSql(
>       "INSERT INTO CLICKHOUSE_SINK_BENCHMARK SELECT '2020-12-12', * FROM CLICKHOUSE_SOURCE_BENCHMARK");
>
> }
>
> *running command:*
>
> mvn clean package -DskipTests
>
> <plugin>
>   <groupId>org.codehaus.mojo</groupId>
>   <artifactId>exec-maven-plugin</artifactId>
>   <version>1.6.0</version>
>   <executions>
>     <execution>
>       <id>test-benchmarks</id>
>       <phase>test</phase>
>       <goals>
>         <goal>exec</goal>
>       </goals>
>     </execution>
>   </executions>
>   <configuration>
>     <skip>false</skip>
>     <classpathScope>test</classpathScope>
>     <executable>java</executable>
>     <arguments>
>       <argument>-Xmx6g</argument>
>       <argument>-classpath</argument>
>       <classpath/>
>       <argument>org.openjdk.jmh.Main</argument>
>       <!--shouldFailOnError-->
>       <argument>-foe</argument>
>       <argument>true</argument>
>       <!--speed up tests-->
>       <argument>-f</argument>
>       <argument>1</argument>
>       <argument>-i</argument>
>       <argument>1</argument>
>       <argument>-wi</argument>
>       <argument>0</argument>
>       <argument>-rf</argument>
>       <argument>csv</argument>
>       <argument>.*</argument>
>     </arguments>
>   </configuration>
> </plugin>
>
>
> Non-finished threads:
>
> Thread[Source: TableSourceScan(table=[[default_catalog, default_database,
> CLICKHOUSE_SOURCE_BENCHMARK]], fields=[first_bigint, second_bigint,
> first_int, second_int, first_float, second_float, first_double,
> second_double, first_string, s
> econd_string]) -> Calc(select=[_UTF-16LE'2020-12-12' AS EXPR$0,
> first_bigint, second_bigint, first_int, second_int, first_float,
> second_float, first_double, second_double, first_string, second_string]) ->
> Sink: Sink(table=[default_catal
> og.default_database.CLICKHOUSE_SINK_BENCHMARK], fields=[dt, first_bigint,
> second_bigint, first_int, second_int, first_float, second_float,
> first_double, second_double, first_string, second_string]) (1/6),5,Flink
> Task Threads]
>  at sun.misc.Unsafe.park(Native Method)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
>
>  at
> org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:146)
>
>  at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:298)
>
>  at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:183)
>
>  at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:569)
>
>  at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:534)
>
>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>  at java.lang.Thread.run(Thread.java:748)
>
> Thread[flink-akka.actor.default-dispatcher-8,5,main]
>  at sun.misc.Unsafe.park(Native Method)
>  at akka.dispatch.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
>  at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>  at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
> Thread[flink-akka.actor.default-dispatcher-2,5,main]
>  at sun.misc.Unsafe.park(Native Method)
>  at akka.dispatch.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
>  at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>  at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
> Thread[Source: TableSourceScan(table=[[default_catalog, default_database,
> CLICKHOUSE_SOURCE_BENCHMARK]], fields=[first_bigint, second_bigint,
> first_int, second_int, first_float, second_float, first_double,
> second_double, first_string, s
> econd_string]) -> Calc(select=[_UTF-16LE'2020-12-12' AS EXPR$0,
> first_bigint, second_bigint, first_int, second_int, first_float,
> second_float, first_double, second_double, first_string, second_string]) ->
> Sink: Sink(table=[default_catal
> og.default_database.CLICKHOUSE_SINK_BENCHMARK], fields=[dt, first_bigint,
> second_bigint, first_int, second_int, first_float, second_float,
> first_double, second_double, first_string, second_string]) (4/6),5,Flink
> Task Threads]
>  at sun.misc.Unsafe.park(Native Method)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
>
>  at
> org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:146)
>
>  at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:298)
>
>  at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:183)
>
>  at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:569)
>
>  at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:534)
>
>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>  at java.lang.Thread.run(Thread.java:748)
>
>
> --
>
> *Best Regards*
> *Jeremy Mei*
>