You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Ruben Laguna <ru...@gmail.com> on 2020/10/29 15:13:19 UTC
Table Print SQL Connector
How can I use the Table [Print SQL connector][1]? I tried the
following (batch mode) but it does not give any output:
EnvironmentSettings settings =
EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment tEnv = TableEnvironment.create(settings);
final LocalDateTime DATE_TIME = LocalDateTime.of(2020, 1, 1, 0, 0);
Table transactions =
tEnv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("account_id", DataTypes.BIGINT()),
DataTypes.FIELD("amount", DataTypes.BIGINT()),
DataTypes.FIELD("transaction_time",
DataTypes.TIMESTAMP(3))),
Row.of(1, 188, DATE_TIME.plusMinutes(12)),
Row.of(2, 374, DATE_TIME.plusMinutes(47)),
Row.of(3, 112, DATE_TIME.plusMinutes(36)),
Row.of(4, 478, DATE_TIME.plusMinutes(3)),
Row.of(5, 208, DATE_TIME.plusMinutes(8)),
Row.of(1, 379, DATE_TIME.plusMinutes(53)),
Row.of(2, 351, DATE_TIME.plusMinutes(32)),
Row.of(3, 320, DATE_TIME.plusMinutes(31)),
Row.of(4, 259, DATE_TIME.plusMinutes(19)),
Row.of(5, 273, DATE_TIME.plusMinutes(42)));
tEnv.executeSql("CREATE TABLE print_table(account_id BIGINT, amount
BIGINT, transaction_time TIMESTAMP) WITH ('connector' = 'print')");
transactions.executeInsert("print_table");
I can "materialize" the result manually and print them out with :
for (Row row : materialize(transactions.execute())) {
System.out.println(row);
}
private static List<Row> materialize(TableResult results) {
try (CloseableIterator<Row> resultIterator = results.collect()) {
return StreamSupport
.stream(Spliterators.spliteratorUnknownSize(resultIterator,
Spliterator.ORDERED), false)
.collect(Collectors.toList());
} catch (Exception e) {
throw new RuntimeException("Failed to materialize results", e);
}
}
But I would like to know why I can't just use the Print sink.
I've tried with `.inBatchMode()` and with `inStreamingMode()`, so I
don't thinks it's that.
Does anybody know of any working example involving the print connector?
[1]: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/print.html
--
/Rubén
Re: Table Print SQL Connector
Posted by Dawid Wysakowicz <dw...@apache.org>.
Hi,
The problem in your case is that you exit before anything is printed
out. The method executeInsert executes the query, but it does not wait
for the query to finish. Therefore your main/test method returns,
bringing down the local cluster, before anything is printed out. You can
e.g. add
TableResult result = transactions.executeInsert("print_table");
result.await();
which will wait for the insert to finish.
The print sink prints into the stdout/stderr directly, therefore none of
the logging configurations apply in this case.
Best,
Dawid
On 29/10/2020 18:29, Ruben Laguna wrote:
> Hi,
>
> Using `mytable.execute().print()` is exactly what I wanted, thanks.
>
> But I'm still curious. I'm just running this locally, in a junit test
> case (not using a flink
> cluster) just like in [flink-playgrounds SpendReportTest][1] so in
> this scenario where does the task manager (if there is taskmanager)
> output go?
>
> I just added src/test/resources/log4j.properties with
>
> # Root logger option
> log4j.rootLogger=INFO, stdout
>
> # Direct log messages to stdout
> log4j.appender.stdout=org.apache.log4j.ConsoleAppender
> log4j.appender.stdout.Target=System.out
> log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
> log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss}
> %-5p %c{1}:%L - %m%n
>
> and still I don't see anything from the print sink, and I even run it
> with the debugger and I can see that although
> PrintSink#getSinkRuntimeProvider is called , the
> RowDataPrintFunction#invoke is never called.
>
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in
> [jar:file:/Users/ecerulm/.m2/repository/org/slf4j/slf4j-log4j12/1.7.10/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in
> [jar:file:/Users/ecerulm/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.12.1/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> 2020-10-29 18:18:32 INFO TaskExecutorResourceUtils:188 - The
> configuration option taskmanager.cpu.cores required for local
> execution is not set, setting it to the maximal possible value.
> 2020-10-29 18:18:32 INFO TaskExecutorResourceUtils:188 - The
> configuration option taskmanager.memory.task.heap.size required for
> local execution is not set, setting it to the maximal possible value.
> 2020-10-29 18:18:32 INFO TaskExecutorResourceUtils:188 - The
> configuration option taskmanager.memory.task.off-heap.size required
> for local execution is not set, setting it to the maximal possible
> value.
> 2020-10-29 18:18:32 INFO TaskExecutorResourceUtils:188 - The
> configuration option taskmanager.memory.network.min required for local
> execution is not set, setting it to its default value 64 mb.
> 2020-10-29 18:18:32 INFO TaskExecutorResourceUtils:188 - The
> configuration option taskmanager.memory.network.max required for local
> execution is not set, setting it to its default value 64 mb.
> 2020-10-29 18:18:32 INFO TaskExecutorResourceUtils:188 - The
> configuration option taskmanager.memory.managed.size required for
> local execution is not set, setting it to its default value 128 mb.
> 2020-10-29 18:18:32 INFO MiniCluster:253 - Starting Flink Mini Cluster
> 2020-10-29 18:18:32 INFO MiniCluster:262 - Starting Metrics Registry
> 2020-10-29 18:18:32 INFO MetricRegistryImpl:122 - No metrics reporter
> configured, no metrics will be exposed/reported.
> 2020-10-29 18:18:32 INFO MiniCluster:266 - Starting RPC Service(s)
> 2020-10-29 18:18:32 INFO AkkaRpcServiceUtils:247 - Trying to start
> local actor system
> 2020-10-29 18:18:32 INFO Slf4jLogger:92 - Slf4jLogger started
> 2020-10-29 18:18:32 INFO AkkaRpcServiceUtils:278 - Actor system
> started at akka://flink
> 2020-10-29 18:18:32 INFO AkkaRpcServiceUtils:247 - Trying to start
> local actor system
> 2020-10-29 18:18:32 INFO Slf4jLogger:92 - Slf4jLogger started
> 2020-10-29 18:18:32 INFO AkkaRpcServiceUtils:278 - Actor system
> started at akka://flink-metrics
> 2020-10-29 18:18:32 INFO AkkaRpcService:225 - Starting RPC endpoint
> for org.apache.flink.runtime.metrics.dump.MetricQueryService at
> akka://flink-metrics/user/rpc/MetricQueryService .
> 2020-10-29 18:18:32 INFO MiniCluster:432 - Starting high-availability services
> 2020-10-29 18:18:32 INFO BlobServer:143 - Created BLOB server storage
> directory /var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/blobStore-30bb2435-c664-4d1e-8c74-80cb54157860
> 2020-10-29 18:18:32 INFO BlobServer:207 - Started BLOB server at
> 0.0.0.0:50965 - max concurrent requests: 50 - max backlog: 1000
> 2020-10-29 18:18:32 INFO PermanentBlobCache:107 - Created BLOB cache
> storage directory
> /var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/blobStore-7a0bd0fc-2864-4a66-afd6-5f117d515b07
> 2020-10-29 18:18:32 INFO TransientBlobCache:107 - Created BLOB cache
> storage directory
> /var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/blobStore-fd96dacc-4ce4-447a-ba3f-d2ce453a1d30
> 2020-10-29 18:18:32 INFO MiniCluster:519 - Starting 1 TaskManger(s)
> 2020-10-29 18:18:32 INFO TaskManagerRunner:412 - Starting TaskManager
> with ResourceID: 2358fbac-908d-4aa2-b643-c32d44b40193
> 2020-10-29 18:18:32 INFO TaskManagerServices:411 - Temporary file
> directory '/var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T': total
> 233 GB, usable 25 GB (10.73% usable)
> 2020-10-29 18:18:32 INFO FileChannelManagerImpl:97 -
> FileChannelManager uses directory
> /var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/flink-io-9aa7adae-549f-461c-95d8-6fe47f31b695
> for spill files.
> 2020-10-29 18:18:32 INFO FileChannelManagerImpl:97 -
> FileChannelManager uses directory
> /var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/flink-netty-shuffle-c8e6cc04-6f93-4f55-a2c5-88891c9e9e49
> for spill files.
> 2020-10-29 18:18:32 INFO NetworkBufferPool:139 - Allocated 64 MB for
> network buffer pool (number of memory segments: 2048, bytes per
> segment: 32768).
> 2020-10-29 18:18:32 INFO NettyShuffleEnvironment:293 - Starting the
> network environment and its components.
> 2020-10-29 18:18:32 INFO KvStateService:89 - Starting the kvState
> service and its components.
> 2020-10-29 18:18:32 INFO AkkaRpcService:225 - Starting RPC endpoint
> for org.apache.flink.runtime.taskexecutor.TaskExecutor at
> akka://flink/user/rpc/taskmanager_0 .
> 2020-10-29 18:18:32 INFO DefaultJobLeaderService:116 - Start job
> leader service.
> 2020-10-29 18:18:32 INFO FileCache:107 - User file cache uses
> directory /var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/flink-dist-cache-4ca7b653-8aa5-4967-b23e-574c43ab7b52
> 2020-10-29 18:18:32 INFO DispatcherRestEndpoint:140 - Starting rest endpoint.
> 2020-10-29 18:18:32 INFO DispatcherRestEndpoint:126 - Failed to load
> web based job submission extension. Probable reason: flink-runtime-web
> is not in the classpath.
> 2020-10-29 18:18:32 WARN WebMonitorUtils:85 - Log file environment
> variable 'log.file' is not set.
> 2020-10-29 18:18:32 WARN WebMonitorUtils:91 - JobManager log files
> are unavailable in the web dashboard. Log file location not found in
> environment variable 'log.file' or configuration key 'web.log.path'.
> 2020-10-29 18:18:33 INFO DispatcherRestEndpoint:236 - Rest endpoint
> listening at localhost:50966
> 2020-10-29 18:18:33 INFO EmbeddedLeaderService:302 - Proposing
> leadership to contender http://localhost:50966
> 2020-10-29 18:18:33 INFO DispatcherRestEndpoint:821 -
> http://localhost:50966 was granted leadership with
> leaderSessionID=5d1c56ab-6894-42d9-bc76-716ea59bd473
> 2020-10-29 18:18:33 INFO EmbeddedLeaderService:252 - Received
> confirmation of leadership for leader http://localhost:50966 ,
> session=5d1c56ab-6894-42d9-bc76-716ea59bd473
> 2020-10-29 18:18:33 INFO AkkaRpcService:225 - Starting RPC endpoint
> for org.apache.flink.runtime.resourcemanager.StandaloneResourceManager
> at akka://flink/user/rpc/resourcemanager_1 .
> 2020-10-29 18:18:33 INFO EmbeddedLeaderService:302 - Proposing
> leadership to contender LeaderContender: DefaultDispatcherRunner
> 2020-10-29 18:18:33 INFO EmbeddedLeaderService:302 - Proposing
> leadership to contender LeaderContender: StandaloneResourceManager
> 2020-10-29 18:18:33 INFO StandaloneResourceManager:1026 -
> ResourceManager akka://flink/user/rpc/resourcemanager_1 was granted
> leadership with fencing token a8e96f157326e3f60a2df92bb1364f97
> 2020-10-29 18:18:33 INFO MiniCluster:372 - Flink Mini Cluster started
> successfully
> 2020-10-29 18:18:33 INFO SlotManagerImpl:284 - Starting the SlotManager.
> 2020-10-29 18:18:33 INFO SessionDispatcherLeaderProcess:102 - Start
> SessionDispatcherLeaderProcess.
> 2020-10-29 18:18:33 INFO SessionDispatcherLeaderProcess:120 - Recover
> all persisted job graphs.
> 2020-10-29 18:18:33 INFO SessionDispatcherLeaderProcess:128 -
> Successfully recovered 0 persisted job graphs.
> 2020-10-29 18:18:33 INFO EmbeddedLeaderService:252 - Received
> confirmation of leadership for leader
> akka://flink/user/rpc/resourcemanager_1 ,
> session=0a2df92b-b136-4f97-a8e9-6f157326e3f6
> 2020-10-29 18:18:33 INFO TaskExecutor:1128 - Connecting to
> ResourceManager
> akka://flink/user/rpc/resourcemanager_1(a8e96f157326e3f60a2df92bb1364f97).
> 2020-10-29 18:18:33 INFO AkkaRpcService:225 - Starting RPC endpoint
> for org.apache.flink.runtime.dispatcher.StandaloneDispatcher at
> akka://flink/user/rpc/dispatcher_2 .
> 2020-10-29 18:18:33 INFO EmbeddedLeaderService:252 - Received
> confirmation of leadership for leader
> akka://flink/user/rpc/dispatcher_2 ,
> session=3ce5d1bf-2d6d-49d6-b315-717f66748938
> 2020-10-29 18:18:33 INFO TaskExecutor:155 - Resolved ResourceManager
> address, beginning registration
> 2020-10-29 18:18:33 INFO StandaloneResourceManager:821 - Registering
> TaskManager with ResourceID 2358fbac-908d-4aa2-b643-c32d44b40193
> (akka://flink/user/rpc/taskmanager_0) at ResourceManager
> 2020-10-29 18:18:33 INFO TaskExecutor:84 - Successful registration at
> resource manager akka://flink/user/rpc/resourcemanager_1 under
> registration id 0be76fda7dfc8204153de66b83ba5621.
> 2020-10-29 18:18:33 INFO StandaloneDispatcher:295 - Received JobGraph
> submission 1fd25ab4d3d51009542ebda2bbadb55d
> (insert-into_default_catalog.default_database.print_table).
> 2020-10-29 18:18:33 INFO StandaloneDispatcher:352 - Submitting job
> 1fd25ab4d3d51009542ebda2bbadb55d
> (insert-into_default_catalog.default_database.print_table).
> 2020-10-29 18:18:33 INFO AkkaRpcService:225 - Starting RPC endpoint
> for org.apache.flink.runtime.jobmaster.JobMaster at
> akka://flink/user/rpc/jobmanager_3 .
> 2020-10-29 18:18:33 INFO JobMaster:288 - Initializing job
> insert-into_default_catalog.default_database.print_table
> (1fd25ab4d3d51009542ebda2bbadb55d).
> 2020-10-29 18:18:33 INFO JobMaster:84 - Using restart back off time
> strategy NoRestartBackoffTimeStrategy for
> insert-into_default_catalog.default_database.print_table
> (1fd25ab4d3d51009542ebda2bbadb55d).
> 2020-10-29 18:18:33 INFO JobMaster:211 - Running initialization on
> master for job insert-into_default_catalog.default_database.print_table
> (1fd25ab4d3d51009542ebda2bbadb55d).
> 2020-10-29 18:18:33 INFO JobMaster:229 - Successfully ran
> initialization on master in 3 ms.
> 2020-10-29 18:18:33 INFO DefaultExecutionTopology:111 - Built 1
> pipelined regions in 0 ms
> 2020-10-29 18:18:33 INFO JobMaster:231 - No state backend has been
> configured, using default (Memory / JobManager) MemoryStateBackend
> (data in heap memory / checkpoints to JobManager) (checkpoints:
> 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880)
> 2020-10-29 18:18:33 INFO JobMaster:165 - Using failover strategy
> org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@5da7a3b6
> for insert-into_default_catalog.default_database.print_table
> (1fd25ab4d3d51009542ebda2bbadb55d).
> 2020-10-29 18:18:33 INFO EmbeddedLeaderService:302 - Proposing
> leadership to contender akka://flink/user/rpc/jobmanager_3
> 2020-10-29 18:18:33 INFO JobManagerRunnerImpl:305 - JobManager runner
> for job insert-into_default_catalog.default_database.print_table
> (1fd25ab4d3d51009542ebda2bbadb55d) was granted leadership with session
> id 47a72e91-c0bf-4b40-8b9f-639f07344b3f at
> akka://flink/user/rpc/jobmanager_3.
> 2020-10-29 18:18:33 INFO JobMaster:799 - Starting execution of job
> insert-into_default_catalog.default_database.print_table
> (1fd25ab4d3d51009542ebda2bbadb55d) under job master id
> 8b9f639f07344b3f47a72e91c0bf4b40.
> 2020-10-29 18:18:33 INFO JobMaster:197 - Starting scheduling with
> scheduling strategy
> [org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy]
> 2020-10-29 18:18:33 INFO ExecutionGraph:1253 - Job
> insert-into_default_catalog.default_database.print_table
> (1fd25ab4d3d51009542ebda2bbadb55d) switched from state CREATED to
> RUNNING.
> 2020-10-29 18:18:33 INFO ExecutionGraph:1584 - Source:
> Values(tuples=[[{ 0 }]]) -> (Calc(select=[CAST(1:BIGINT) AS
> account_id, CAST(188:BIGINT) AS amount, CAST(CAST(2020-01-01
> 00:12:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(2:BIGINT) AS account_id, CAST(374:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:47:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(3:BIGINT) AS account_id, CAST(112:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:36:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(4:BIGINT) AS account_id, CAST(478:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:03:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(5:BIGINT) AS account_id, CAST(208:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:08:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(1:BIGINT) AS account_id, CAST(379:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:53:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(2:BIGINT) AS account_id, CAST(351:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:32:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(3:BIGINT) AS account_id, CAST(320:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:31:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(4:BIGINT) AS account_id, CAST(259:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:19:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(5:BIGINT) AS account_id, CAST(273:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:42:00:TIMESTAMP(3))) AS transaction_time]))
> (1/1) (1fd25ab4d3d51009542ebda2bbadb55d_d07bb0f8535d5573ba7aa0f9242a6583_0_0)
> switched from CREATED to SCHEDULED.
> 2020-10-29 18:18:33 INFO ExecutionGraph:1584 - Sink:
> Sink(table=[default_catalog.default_database.print_table],
> fields=[account_id, amount, transaction_time]) (1/1)
> (1fd25ab4d3d51009542ebda2bbadb55d_4b71d4c67c3b183d6f63a06700c86645_0_0)
> switched from CREATED to SCHEDULED.
> 2020-10-29 18:18:33 INFO SlotPoolImpl:385 - Cannot serve slot
> request, no ResourceManager connected. Adding as pending request
> [SlotRequestId{3e5c5fdf21b3314a9289b257aeec8c2d}]
> 2020-10-29 18:18:33 INFO EmbeddedLeaderService:252 - Received
> confirmation of leadership for leader
> akka://flink/user/rpc/jobmanager_3 ,
> session=47a72e91-c0bf-4b40-8b9f-639f07344b3f
> 2020-10-29 18:18:33 INFO JobMaster:1031 - Connecting to
> ResourceManager
> akka://flink/user/rpc/resourcemanager_1(a8e96f157326e3f60a2df92bb1364f97)
> 2020-10-29 18:18:33 INFO JobMaster:155 - Resolved ResourceManager
> address, beginning registration
> 2020-10-29 18:18:33 INFO StandaloneResourceManager:330 - Registering
> job manager 8b9f639f07344b3f47a72e91c0bf4b40@akka://flink/user/rpc/jobmanager_3
> for job 1fd25ab4d3d51009542ebda2bbadb55d.
> 2020-10-29 18:18:33 INFO StandaloneResourceManager:765 - Registered
> job manager 8b9f639f07344b3f47a72e91c0bf4b40@akka://flink/user/rpc/jobmanager_3
> for job 1fd25ab4d3d51009542ebda2bbadb55d.
> 2020-10-29 18:18:33 INFO JobMaster:1053 - JobManager successfully
> registered at ResourceManager, leader id:
> a8e96f157326e3f60a2df92bb1364f97.
> 2020-10-29 18:18:33 INFO SlotPoolImpl:347 - Requesting new slot
> [SlotRequestId{3e5c5fdf21b3314a9289b257aeec8c2d}] and profile
> ResourceProfile{UNKNOWN} with allocation id
> 45ea6506d06f73f61a36db764ce07ba7 from resource manager.
> 2020-10-29 18:18:33 INFO StandaloneResourceManager:464 - Request slot
> with profile ResourceProfile{UNKNOWN} for job
> 1fd25ab4d3d51009542ebda2bbadb55d with allocation id
> 45ea6506d06f73f61a36db764ce07ba7.
> 2020-10-29 18:18:33 INFO TaskExecutor:908 - Receive slot request
> 45ea6506d06f73f61a36db764ce07ba7 for job
> 1fd25ab4d3d51009542ebda2bbadb55d from resource manager with leader id
> a8e96f157326e3f60a2df92bb1364f97.
> 2020-10-29 18:18:33 INFO TaskExecutor:976 - Allocated slot for
> 45ea6506d06f73f61a36db764ce07ba7.
> 2020-10-29 18:18:33 INFO DefaultJobLeaderService:172 - Add job
> 1fd25ab4d3d51009542ebda2bbadb55d for job leader monitoring.
> 2020-10-29 18:18:33 INFO DefaultJobLeaderService:314 - Try to
> register at job manager akka://flink/user/rpc/jobmanager_3 with leader
> id 47a72e91-c0bf-4b40-8b9f-639f07344b3f.
> 2020-10-29 18:18:33 INFO DefaultJobLeaderService:155 - Resolved
> JobManager address, beginning registration
> 2020-10-29 18:18:33 INFO DefaultJobLeaderService:369 - Successful
> registration at job manager akka://flink/user/rpc/jobmanager_3 for job
> 1fd25ab4d3d51009542ebda2bbadb55d.
> 2020-10-29 18:18:33 INFO TaskExecutor:1379 - Establish JobManager
> connection for job 1fd25ab4d3d51009542ebda2bbadb55d.
> 2020-10-29 18:18:33 INFO TaskExecutor:1278 - Offer reserved slots to
> the leader of job 1fd25ab4d3d51009542ebda2bbadb55d.
> 2020-10-29 18:18:33 INFO PermanentBlobCache:251 - Shutting down BLOB cache
> 2020-10-29 18:18:33 INFO TaskExecutorLocalStateStoresManager:213 -
> Shutting down TaskExecutorLocalStateStoresManager.
> 2020-10-29 18:18:33 INFO TransientBlobCache:251 - Shutting down BLOB cache
> 2020-10-29 18:18:33 INFO ExecutionGraph:1584 - Source:
> Values(tuples=[[{ 0 }]]) -> (Calc(select=[CAST(1:BIGINT) AS
> account_id, CAST(188:BIGINT) AS amount, CAST(CAST(2020-01-01
> 00:12:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(2:BIGINT) AS account_id, CAST(374:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:47:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(3:BIGINT) AS account_id, CAST(112:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:36:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(4:BIGINT) AS account_id, CAST(478:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:03:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(5:BIGINT) AS account_id, CAST(208:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:08:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(1:BIGINT) AS account_id, CAST(379:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:53:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(2:BIGINT) AS account_id, CAST(351:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:32:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(3:BIGINT) AS account_id, CAST(320:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:31:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(4:BIGINT) AS account_id, CAST(259:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:19:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(5:BIGINT) AS account_id, CAST(273:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:42:00:TIMESTAMP(3))) AS transaction_time]))
> (1/1) (1fd25ab4d3d51009542ebda2bbadb55d_d07bb0f8535d5573ba7aa0f9242a6583_0_0)
> switched from SCHEDULED to DEPLOYING.
> 2020-10-29 18:18:33 INFO ExecutionGraph:725 - Deploying Source:
> Values(tuples=[[{ 0 }]]) -> (Calc(select=[CAST(1:BIGINT) AS
> account_id, CAST(188:BIGINT) AS amount, CAST(CAST(2020-01-01
> 00:12:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(2:BIGINT) AS account_id, CAST(374:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:47:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(3:BIGINT) AS account_id, CAST(112:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:36:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(4:BIGINT) AS account_id, CAST(478:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:03:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(5:BIGINT) AS account_id, CAST(208:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:08:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(1:BIGINT) AS account_id, CAST(379:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:53:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(2:BIGINT) AS account_id, CAST(351:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:32:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(3:BIGINT) AS account_id, CAST(320:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:31:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(4:BIGINT) AS account_id, CAST(259:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:19:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(5:BIGINT) AS account_id, CAST(273:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:42:00:TIMESTAMP(3))) AS transaction_time]))
> (1/1) (attempt #0) with attempt id
> 1fd25ab4d3d51009542ebda2bbadb55d_d07bb0f8535d5573ba7aa0f9242a6583_0_0
> to 2358fbac-908d-4aa2-b643-c32d44b40193 @ localhost (dataPort=-1) with
> allocation id 45ea6506d06f73f61a36db764ce07ba7
> 2020-10-29 18:18:33 INFO ExecutionGraph:1584 - Sink:
> Sink(table=[default_catalog.default_database.print_table],
> fields=[account_id, amount, transaction_time]) (1/1)
> (1fd25ab4d3d51009542ebda2bbadb55d_4b71d4c67c3b183d6f63a06700c86645_0_0)
> switched from SCHEDULED to DEPLOYING.
> 2020-10-29 18:18:33 INFO ExecutionGraph:725 - Deploying Sink:
> Sink(table=[default_catalog.default_database.print_table],
> fields=[account_id, amount, transaction_time]) (1/1) (attempt #0) with
> attempt id 1fd25ab4d3d51009542ebda2bbadb55d_4b71d4c67c3b183d6f63a06700c86645_0_0
> to 2358fbac-908d-4aa2-b643-c32d44b40193 @ localhost (dataPort=-1) with
> allocation id 45ea6506d06f73f61a36db764ce07ba7
> 2020-10-29 18:18:33 INFO TaskSlotTableImpl:361 - Activate slot
> 45ea6506d06f73f61a36db764ce07ba7.
> 2020-10-29 18:18:33 INFO FileChannelManagerImpl:146 -
> FileChannelManager removed spill file directory
> /var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/flink-io-9aa7adae-549f-461c-95d8-6fe47f31b695
> 2020-10-29 18:18:33 INFO BlobServer:348 - Stopped BLOB server at 0.0.0.0:50965
> 2020-10-29 18:18:33 INFO FileChannelManagerImpl:146 -
> FileChannelManager removed spill file directory
> /var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/flink-netty-shuffle-c8e6cc04-6f93-4f55-a2c5-88891c9e9e49
> 2020-10-29 18:18:33 INFO FileCache:153 - removed file cache directory
> /var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/flink-dist-cache-4ca7b653-8aa5-4967-b23e-574c43ab7b52
>
> Process finished with exit code 0
>
>
> [1]: https://github.com/apache/flink-playgrounds/blob/master/table-walkthrough/src/test/java/org/apache/flink/playgrounds/spendreport/SpendReportTest.java
>
> On Thu, Oct 29, 2020 at 4:53 PM Dawid Wysakowicz <dw...@apache.org> wrote:
>> You should be able to use the "print" sink. Remember though that the
>> "print" sink prints into the stdout/stderr of TaskManagers, not the
>> Client, where you submit the query. This is different from the
>> TableResult, which collects results in the client. BTW, for printing you
>> can use TableResult#print, which will nicely format your results.
>>
>> Best,
>>
>> Dawid
>>
>> On 29/10/2020 16:13, Ruben Laguna wrote:
>>> How can I use the Table [Print SQL connector][1]? I tried the
>>> following (batch mode) but it does not give any output:
>>>
>>>
>>> EnvironmentSettings settings =
>>> EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
>>> TableEnvironment tEnv = TableEnvironment.create(settings);
>>>
>>> final LocalDateTime DATE_TIME = LocalDateTime.of(2020, 1, 1, 0, 0);
>>>
>>> Table transactions =
>>> tEnv.fromValues(
>>> DataTypes.ROW(
>>> DataTypes.FIELD("account_id", DataTypes.BIGINT()),
>>> DataTypes.FIELD("amount", DataTypes.BIGINT()),
>>> DataTypes.FIELD("transaction_time",
>>> DataTypes.TIMESTAMP(3))),
>>> Row.of(1, 188, DATE_TIME.plusMinutes(12)),
>>> Row.of(2, 374, DATE_TIME.plusMinutes(47)),
>>> Row.of(3, 112, DATE_TIME.plusMinutes(36)),
>>> Row.of(4, 478, DATE_TIME.plusMinutes(3)),
>>> Row.of(5, 208, DATE_TIME.plusMinutes(8)),
>>> Row.of(1, 379, DATE_TIME.plusMinutes(53)),
>>> Row.of(2, 351, DATE_TIME.plusMinutes(32)),
>>> Row.of(3, 320, DATE_TIME.plusMinutes(31)),
>>> Row.of(4, 259, DATE_TIME.plusMinutes(19)),
>>> Row.of(5, 273, DATE_TIME.plusMinutes(42)));
>>> tEnv.executeSql("CREATE TABLE print_table(account_id BIGINT, amount
>>> BIGINT, transaction_time TIMESTAMP) WITH ('connector' = 'print')");
>>>
>>> transactions.executeInsert("print_table");
>>>
>>>
>>> I can "materialize" the result manually and print them out with :
>>>
>>> for (Row row : materialize(transactions.execute())) {
>>> System.out.println(row);
>>> }
>>>
>>> private static List<Row> materialize(TableResult results) {
>>> try (CloseableIterator<Row> resultIterator = results.collect()) {
>>> return StreamSupport
>>>
>>> .stream(Spliterators.spliteratorUnknownSize(resultIterator,
>>> Spliterator.ORDERED), false)
>>> .collect(Collectors.toList());
>>> } catch (Exception e) {
>>> throw new RuntimeException("Failed to materialize results", e);
>>> }
>>> }
>>>
>>>
>>> But I would like to know why I can't just use the Print sink.
>>>
>>> I've tried with `.inBatchMode()` and with `inStreamingMode()`, so I
>>> don't thinks it's that.
>>>
>>> Does anybody know of any working example involving the print connector?
>>>
>>>
>>>
>>> [1]: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/print.html
>
Re: Table Print SQL Connector
Posted by Ruben Laguna <ru...@gmail.com>.
Hi,
Using `mytable.execute().print()` is exactly what I wanted, thanks.
But I'm still curious. I'm just running this locally, in a junit test
case (not using a flink
cluster) just like in [flink-playgrounds SpendReportTest][1] so in
this scenario where does the task manager (if there is taskmanager)
output go?
I just added src/test/resources/log4j.properties with
# Root logger option
log4j.rootLogger=INFO, stdout
# Direct log messages to stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss}
%-5p %c{1}:%L - %m%n
and still I don't see anything from the print sink, and I even run it
with the debugger and I can see that although
PrintSink#getSinkRuntimeProvider is called , the
RowDataPrintFunction#invoke is never called.
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/Users/ecerulm/.m2/repository/org/slf4j/slf4j-log4j12/1.7.10/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/Users/ecerulm/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.12.1/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
2020-10-29 18:18:32 INFO TaskExecutorResourceUtils:188 - The
configuration option taskmanager.cpu.cores required for local
execution is not set, setting it to the maximal possible value.
2020-10-29 18:18:32 INFO TaskExecutorResourceUtils:188 - The
configuration option taskmanager.memory.task.heap.size required for
local execution is not set, setting it to the maximal possible value.
2020-10-29 18:18:32 INFO TaskExecutorResourceUtils:188 - The
configuration option taskmanager.memory.task.off-heap.size required
for local execution is not set, setting it to the maximal possible
value.
2020-10-29 18:18:32 INFO TaskExecutorResourceUtils:188 - The
configuration option taskmanager.memory.network.min required for local
execution is not set, setting it to its default value 64 mb.
2020-10-29 18:18:32 INFO TaskExecutorResourceUtils:188 - The
configuration option taskmanager.memory.network.max required for local
execution is not set, setting it to its default value 64 mb.
2020-10-29 18:18:32 INFO TaskExecutorResourceUtils:188 - The
configuration option taskmanager.memory.managed.size required for
local execution is not set, setting it to its default value 128 mb.
2020-10-29 18:18:32 INFO MiniCluster:253 - Starting Flink Mini Cluster
2020-10-29 18:18:32 INFO MiniCluster:262 - Starting Metrics Registry
2020-10-29 18:18:32 INFO MetricRegistryImpl:122 - No metrics reporter
configured, no metrics will be exposed/reported.
2020-10-29 18:18:32 INFO MiniCluster:266 - Starting RPC Service(s)
2020-10-29 18:18:32 INFO AkkaRpcServiceUtils:247 - Trying to start
local actor system
2020-10-29 18:18:32 INFO Slf4jLogger:92 - Slf4jLogger started
2020-10-29 18:18:32 INFO AkkaRpcServiceUtils:278 - Actor system
started at akka://flink
2020-10-29 18:18:32 INFO AkkaRpcServiceUtils:247 - Trying to start
local actor system
2020-10-29 18:18:32 INFO Slf4jLogger:92 - Slf4jLogger started
2020-10-29 18:18:32 INFO AkkaRpcServiceUtils:278 - Actor system
started at akka://flink-metrics
2020-10-29 18:18:32 INFO AkkaRpcService:225 - Starting RPC endpoint
for org.apache.flink.runtime.metrics.dump.MetricQueryService at
akka://flink-metrics/user/rpc/MetricQueryService .
2020-10-29 18:18:32 INFO MiniCluster:432 - Starting high-availability services
2020-10-29 18:18:32 INFO BlobServer:143 - Created BLOB server storage
directory /var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/blobStore-30bb2435-c664-4d1e-8c74-80cb54157860
2020-10-29 18:18:32 INFO BlobServer:207 - Started BLOB server at
0.0.0.0:50965 - max concurrent requests: 50 - max backlog: 1000
2020-10-29 18:18:32 INFO PermanentBlobCache:107 - Created BLOB cache
storage directory
/var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/blobStore-7a0bd0fc-2864-4a66-afd6-5f117d515b07
2020-10-29 18:18:32 INFO TransientBlobCache:107 - Created BLOB cache
storage directory
/var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/blobStore-fd96dacc-4ce4-447a-ba3f-d2ce453a1d30
2020-10-29 18:18:32 INFO MiniCluster:519 - Starting 1 TaskManger(s)
2020-10-29 18:18:32 INFO TaskManagerRunner:412 - Starting TaskManager
with ResourceID: 2358fbac-908d-4aa2-b643-c32d44b40193
2020-10-29 18:18:32 INFO TaskManagerServices:411 - Temporary file
directory '/var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T': total
233 GB, usable 25 GB (10.73% usable)
2020-10-29 18:18:32 INFO FileChannelManagerImpl:97 -
FileChannelManager uses directory
/var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/flink-io-9aa7adae-549f-461c-95d8-6fe47f31b695
for spill files.
2020-10-29 18:18:32 INFO FileChannelManagerImpl:97 -
FileChannelManager uses directory
/var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/flink-netty-shuffle-c8e6cc04-6f93-4f55-a2c5-88891c9e9e49
for spill files.
2020-10-29 18:18:32 INFO NetworkBufferPool:139 - Allocated 64 MB for
network buffer pool (number of memory segments: 2048, bytes per
segment: 32768).
2020-10-29 18:18:32 INFO NettyShuffleEnvironment:293 - Starting the
network environment and its components.
2020-10-29 18:18:32 INFO KvStateService:89 - Starting the kvState
service and its components.
2020-10-29 18:18:32 INFO AkkaRpcService:225 - Starting RPC endpoint
for org.apache.flink.runtime.taskexecutor.TaskExecutor at
akka://flink/user/rpc/taskmanager_0 .
2020-10-29 18:18:32 INFO DefaultJobLeaderService:116 - Start job
leader service.
2020-10-29 18:18:32 INFO FileCache:107 - User file cache uses
directory /var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/flink-dist-cache-4ca7b653-8aa5-4967-b23e-574c43ab7b52
2020-10-29 18:18:32 INFO DispatcherRestEndpoint:140 - Starting rest endpoint.
2020-10-29 18:18:32 INFO DispatcherRestEndpoint:126 - Failed to load
web based job submission extension. Probable reason: flink-runtime-web
is not in the classpath.
2020-10-29 18:18:32 WARN WebMonitorUtils:85 - Log file environment
variable 'log.file' is not set.
2020-10-29 18:18:32 WARN WebMonitorUtils:91 - JobManager log files
are unavailable in the web dashboard. Log file location not found in
environment variable 'log.file' or configuration key 'web.log.path'.
2020-10-29 18:18:33 INFO DispatcherRestEndpoint:236 - Rest endpoint
listening at localhost:50966
2020-10-29 18:18:33 INFO EmbeddedLeaderService:302 - Proposing
leadership to contender http://localhost:50966
2020-10-29 18:18:33 INFO DispatcherRestEndpoint:821 -
http://localhost:50966 was granted leadership with
leaderSessionID=5d1c56ab-6894-42d9-bc76-716ea59bd473
2020-10-29 18:18:33 INFO EmbeddedLeaderService:252 - Received
confirmation of leadership for leader http://localhost:50966 ,
session=5d1c56ab-6894-42d9-bc76-716ea59bd473
2020-10-29 18:18:33 INFO AkkaRpcService:225 - Starting RPC endpoint
for org.apache.flink.runtime.resourcemanager.StandaloneResourceManager
at akka://flink/user/rpc/resourcemanager_1 .
2020-10-29 18:18:33 INFO EmbeddedLeaderService:302 - Proposing
leadership to contender LeaderContender: DefaultDispatcherRunner
2020-10-29 18:18:33 INFO EmbeddedLeaderService:302 - Proposing
leadership to contender LeaderContender: StandaloneResourceManager
2020-10-29 18:18:33 INFO StandaloneResourceManager:1026 -
ResourceManager akka://flink/user/rpc/resourcemanager_1 was granted
leadership with fencing token a8e96f157326e3f60a2df92bb1364f97
2020-10-29 18:18:33 INFO MiniCluster:372 - Flink Mini Cluster started
successfully
2020-10-29 18:18:33 INFO SlotManagerImpl:284 - Starting the SlotManager.
2020-10-29 18:18:33 INFO SessionDispatcherLeaderProcess:102 - Start
SessionDispatcherLeaderProcess.
2020-10-29 18:18:33 INFO SessionDispatcherLeaderProcess:120 - Recover
all persisted job graphs.
2020-10-29 18:18:33 INFO SessionDispatcherLeaderProcess:128 -
Successfully recovered 0 persisted job graphs.
2020-10-29 18:18:33 INFO EmbeddedLeaderService:252 - Received
confirmation of leadership for leader
akka://flink/user/rpc/resourcemanager_1 ,
session=0a2df92b-b136-4f97-a8e9-6f157326e3f6
2020-10-29 18:18:33 INFO TaskExecutor:1128 - Connecting to
ResourceManager
akka://flink/user/rpc/resourcemanager_1(a8e96f157326e3f60a2df92bb1364f97).
2020-10-29 18:18:33 INFO AkkaRpcService:225 - Starting RPC endpoint
for org.apache.flink.runtime.dispatcher.StandaloneDispatcher at
akka://flink/user/rpc/dispatcher_2 .
2020-10-29 18:18:33 INFO EmbeddedLeaderService:252 - Received
confirmation of leadership for leader
akka://flink/user/rpc/dispatcher_2 ,
session=3ce5d1bf-2d6d-49d6-b315-717f66748938
2020-10-29 18:18:33 INFO TaskExecutor:155 - Resolved ResourceManager
address, beginning registration
2020-10-29 18:18:33 INFO StandaloneResourceManager:821 - Registering
TaskManager with ResourceID 2358fbac-908d-4aa2-b643-c32d44b40193
(akka://flink/user/rpc/taskmanager_0) at ResourceManager
2020-10-29 18:18:33 INFO TaskExecutor:84 - Successful registration at
resource manager akka://flink/user/rpc/resourcemanager_1 under
registration id 0be76fda7dfc8204153de66b83ba5621.
2020-10-29 18:18:33 INFO StandaloneDispatcher:295 - Received JobGraph
submission 1fd25ab4d3d51009542ebda2bbadb55d
(insert-into_default_catalog.default_database.print_table).
2020-10-29 18:18:33 INFO StandaloneDispatcher:352 - Submitting job
1fd25ab4d3d51009542ebda2bbadb55d
(insert-into_default_catalog.default_database.print_table).
2020-10-29 18:18:33 INFO AkkaRpcService:225 - Starting RPC endpoint
for org.apache.flink.runtime.jobmaster.JobMaster at
akka://flink/user/rpc/jobmanager_3 .
2020-10-29 18:18:33 INFO JobMaster:288 - Initializing job
insert-into_default_catalog.default_database.print_table
(1fd25ab4d3d51009542ebda2bbadb55d).
2020-10-29 18:18:33 INFO JobMaster:84 - Using restart back off time
strategy NoRestartBackoffTimeStrategy for
insert-into_default_catalog.default_database.print_table
(1fd25ab4d3d51009542ebda2bbadb55d).
2020-10-29 18:18:33 INFO JobMaster:211 - Running initialization on
master for job insert-into_default_catalog.default_database.print_table
(1fd25ab4d3d51009542ebda2bbadb55d).
2020-10-29 18:18:33 INFO JobMaster:229 - Successfully ran
initialization on master in 3 ms.
2020-10-29 18:18:33 INFO DefaultExecutionTopology:111 - Built 1
pipelined regions in 0 ms
2020-10-29 18:18:33 INFO JobMaster:231 - No state backend has been
configured, using default (Memory / JobManager) MemoryStateBackend
(data in heap memory / checkpoints to JobManager) (checkpoints:
'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880)
2020-10-29 18:18:33 INFO JobMaster:165 - Using failover strategy
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@5da7a3b6
for insert-into_default_catalog.default_database.print_table
(1fd25ab4d3d51009542ebda2bbadb55d).
2020-10-29 18:18:33 INFO EmbeddedLeaderService:302 - Proposing
leadership to contender akka://flink/user/rpc/jobmanager_3
2020-10-29 18:18:33 INFO JobManagerRunnerImpl:305 - JobManager runner
for job insert-into_default_catalog.default_database.print_table
(1fd25ab4d3d51009542ebda2bbadb55d) was granted leadership with session
id 47a72e91-c0bf-4b40-8b9f-639f07344b3f at
akka://flink/user/rpc/jobmanager_3.
2020-10-29 18:18:33 INFO JobMaster:799 - Starting execution of job
insert-into_default_catalog.default_database.print_table
(1fd25ab4d3d51009542ebda2bbadb55d) under job master id
8b9f639f07344b3f47a72e91c0bf4b40.
2020-10-29 18:18:33 INFO JobMaster:197 - Starting scheduling with
scheduling strategy
[org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy]
2020-10-29 18:18:33 INFO ExecutionGraph:1253 - Job
insert-into_default_catalog.default_database.print_table
(1fd25ab4d3d51009542ebda2bbadb55d) switched from state CREATED to
RUNNING.
2020-10-29 18:18:33 INFO ExecutionGraph:1584 - Source:
Values(tuples=[[{ 0 }]]) -> (Calc(select=[CAST(1:BIGINT) AS
account_id, CAST(188:BIGINT) AS amount, CAST(CAST(2020-01-01
00:12:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(2:BIGINT) AS account_id, CAST(374:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:47:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(3:BIGINT) AS account_id, CAST(112:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:36:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(4:BIGINT) AS account_id, CAST(478:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:03:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(5:BIGINT) AS account_id, CAST(208:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:08:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(1:BIGINT) AS account_id, CAST(379:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:53:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(2:BIGINT) AS account_id, CAST(351:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:32:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(3:BIGINT) AS account_id, CAST(320:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:31:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(4:BIGINT) AS account_id, CAST(259:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:19:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(5:BIGINT) AS account_id, CAST(273:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:42:00:TIMESTAMP(3))) AS transaction_time]))
(1/1) (1fd25ab4d3d51009542ebda2bbadb55d_d07bb0f8535d5573ba7aa0f9242a6583_0_0)
switched from CREATED to SCHEDULED.
2020-10-29 18:18:33 INFO ExecutionGraph:1584 - Sink:
Sink(table=[default_catalog.default_database.print_table],
fields=[account_id, amount, transaction_time]) (1/1)
(1fd25ab4d3d51009542ebda2bbadb55d_4b71d4c67c3b183d6f63a06700c86645_0_0)
switched from CREATED to SCHEDULED.
2020-10-29 18:18:33 INFO SlotPoolImpl:385 - Cannot serve slot
request, no ResourceManager connected. Adding as pending request
[SlotRequestId{3e5c5fdf21b3314a9289b257aeec8c2d}]
2020-10-29 18:18:33 INFO EmbeddedLeaderService:252 - Received
confirmation of leadership for leader
akka://flink/user/rpc/jobmanager_3 ,
session=47a72e91-c0bf-4b40-8b9f-639f07344b3f
2020-10-29 18:18:33 INFO JobMaster:1031 - Connecting to
ResourceManager
akka://flink/user/rpc/resourcemanager_1(a8e96f157326e3f60a2df92bb1364f97)
2020-10-29 18:18:33 INFO JobMaster:155 - Resolved ResourceManager
address, beginning registration
2020-10-29 18:18:33 INFO StandaloneResourceManager:330 - Registering
job manager 8b9f639f07344b3f47a72e91c0bf4b40@akka://flink/user/rpc/jobmanager_3
for job 1fd25ab4d3d51009542ebda2bbadb55d.
2020-10-29 18:18:33 INFO StandaloneResourceManager:765 - Registered
job manager 8b9f639f07344b3f47a72e91c0bf4b40@akka://flink/user/rpc/jobmanager_3
for job 1fd25ab4d3d51009542ebda2bbadb55d.
2020-10-29 18:18:33 INFO JobMaster:1053 - JobManager successfully
registered at ResourceManager, leader id:
a8e96f157326e3f60a2df92bb1364f97.
2020-10-29 18:18:33 INFO SlotPoolImpl:347 - Requesting new slot
[SlotRequestId{3e5c5fdf21b3314a9289b257aeec8c2d}] and profile
ResourceProfile{UNKNOWN} with allocation id
45ea6506d06f73f61a36db764ce07ba7 from resource manager.
2020-10-29 18:18:33 INFO StandaloneResourceManager:464 - Request slot
with profile ResourceProfile{UNKNOWN} for job
1fd25ab4d3d51009542ebda2bbadb55d with allocation id
45ea6506d06f73f61a36db764ce07ba7.
2020-10-29 18:18:33 INFO TaskExecutor:908 - Receive slot request
45ea6506d06f73f61a36db764ce07ba7 for job
1fd25ab4d3d51009542ebda2bbadb55d from resource manager with leader id
a8e96f157326e3f60a2df92bb1364f97.
2020-10-29 18:18:33 INFO TaskExecutor:976 - Allocated slot for
45ea6506d06f73f61a36db764ce07ba7.
2020-10-29 18:18:33 INFO DefaultJobLeaderService:172 - Add job
1fd25ab4d3d51009542ebda2bbadb55d for job leader monitoring.
2020-10-29 18:18:33 INFO DefaultJobLeaderService:314 - Try to
register at job manager akka://flink/user/rpc/jobmanager_3 with leader
id 47a72e91-c0bf-4b40-8b9f-639f07344b3f.
2020-10-29 18:18:33 INFO DefaultJobLeaderService:155 - Resolved
JobManager address, beginning registration
2020-10-29 18:18:33 INFO DefaultJobLeaderService:369 - Successful
registration at job manager akka://flink/user/rpc/jobmanager_3 for job
1fd25ab4d3d51009542ebda2bbadb55d.
2020-10-29 18:18:33 INFO TaskExecutor:1379 - Establish JobManager
connection for job 1fd25ab4d3d51009542ebda2bbadb55d.
2020-10-29 18:18:33 INFO TaskExecutor:1278 - Offer reserved slots to
the leader of job 1fd25ab4d3d51009542ebda2bbadb55d.
2020-10-29 18:18:33 INFO PermanentBlobCache:251 - Shutting down BLOB cache
2020-10-29 18:18:33 INFO TaskExecutorLocalStateStoresManager:213 -
Shutting down TaskExecutorLocalStateStoresManager.
2020-10-29 18:18:33 INFO TransientBlobCache:251 - Shutting down BLOB cache
2020-10-29 18:18:33 INFO ExecutionGraph:1584 - Source:
Values(tuples=[[{ 0 }]]) -> (Calc(select=[CAST(1:BIGINT) AS
account_id, CAST(188:BIGINT) AS amount, CAST(CAST(2020-01-01
00:12:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(2:BIGINT) AS account_id, CAST(374:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:47:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(3:BIGINT) AS account_id, CAST(112:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:36:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(4:BIGINT) AS account_id, CAST(478:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:03:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(5:BIGINT) AS account_id, CAST(208:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:08:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(1:BIGINT) AS account_id, CAST(379:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:53:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(2:BIGINT) AS account_id, CAST(351:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:32:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(3:BIGINT) AS account_id, CAST(320:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:31:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(4:BIGINT) AS account_id, CAST(259:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:19:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(5:BIGINT) AS account_id, CAST(273:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:42:00:TIMESTAMP(3))) AS transaction_time]))
(1/1) (1fd25ab4d3d51009542ebda2bbadb55d_d07bb0f8535d5573ba7aa0f9242a6583_0_0)
switched from SCHEDULED to DEPLOYING.
2020-10-29 18:18:33 INFO ExecutionGraph:725 - Deploying Source:
Values(tuples=[[{ 0 }]]) -> (Calc(select=[CAST(1:BIGINT) AS
account_id, CAST(188:BIGINT) AS amount, CAST(CAST(2020-01-01
00:12:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(2:BIGINT) AS account_id, CAST(374:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:47:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(3:BIGINT) AS account_id, CAST(112:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:36:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(4:BIGINT) AS account_id, CAST(478:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:03:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(5:BIGINT) AS account_id, CAST(208:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:08:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(1:BIGINT) AS account_id, CAST(379:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:53:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(2:BIGINT) AS account_id, CAST(351:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:32:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(3:BIGINT) AS account_id, CAST(320:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:31:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(4:BIGINT) AS account_id, CAST(259:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:19:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(5:BIGINT) AS account_id, CAST(273:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:42:00:TIMESTAMP(3))) AS transaction_time]))
(1/1) (attempt #0) with attempt id
1fd25ab4d3d51009542ebda2bbadb55d_d07bb0f8535d5573ba7aa0f9242a6583_0_0
to 2358fbac-908d-4aa2-b643-c32d44b40193 @ localhost (dataPort=-1) with
allocation id 45ea6506d06f73f61a36db764ce07ba7
2020-10-29 18:18:33 INFO ExecutionGraph:1584 - Sink:
Sink(table=[default_catalog.default_database.print_table],
fields=[account_id, amount, transaction_time]) (1/1)
(1fd25ab4d3d51009542ebda2bbadb55d_4b71d4c67c3b183d6f63a06700c86645_0_0)
switched from SCHEDULED to DEPLOYING.
2020-10-29 18:18:33 INFO ExecutionGraph:725 - Deploying Sink:
Sink(table=[default_catalog.default_database.print_table],
fields=[account_id, amount, transaction_time]) (1/1) (attempt #0) with
attempt id 1fd25ab4d3d51009542ebda2bbadb55d_4b71d4c67c3b183d6f63a06700c86645_0_0
to 2358fbac-908d-4aa2-b643-c32d44b40193 @ localhost (dataPort=-1) with
allocation id 45ea6506d06f73f61a36db764ce07ba7
2020-10-29 18:18:33 INFO TaskSlotTableImpl:361 - Activate slot
45ea6506d06f73f61a36db764ce07ba7.
2020-10-29 18:18:33 INFO FileChannelManagerImpl:146 -
FileChannelManager removed spill file directory
/var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/flink-io-9aa7adae-549f-461c-95d8-6fe47f31b695
2020-10-29 18:18:33 INFO BlobServer:348 - Stopped BLOB server at 0.0.0.0:50965
2020-10-29 18:18:33 INFO FileChannelManagerImpl:146 -
FileChannelManager removed spill file directory
/var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/flink-netty-shuffle-c8e6cc04-6f93-4f55-a2c5-88891c9e9e49
2020-10-29 18:18:33 INFO FileCache:153 - removed file cache directory
/var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/flink-dist-cache-4ca7b653-8aa5-4967-b23e-574c43ab7b52
Process finished with exit code 0
[1]: https://github.com/apache/flink-playgrounds/blob/master/table-walkthrough/src/test/java/org/apache/flink/playgrounds/spendreport/SpendReportTest.java
On Thu, Oct 29, 2020 at 4:53 PM Dawid Wysakowicz <dw...@apache.org> wrote:
>
> You should be able to use the "print" sink. Remember though that the
> "print" sink prints into the stdout/stderr of TaskManagers, not the
> Client, where you submit the query. This is different from the
> TableResult, which collects results in the client. BTW, for printing you
> can use TableResult#print, which will nicely format your results.
>
> Best,
>
> Dawid
>
> On 29/10/2020 16:13, Ruben Laguna wrote:
> > How can I use the Table [Print SQL connector][1]? I tried the
> > following (batch mode) but it does not give any output:
> >
> >
> > EnvironmentSettings settings =
> > EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
> > TableEnvironment tEnv = TableEnvironment.create(settings);
> >
> > final LocalDateTime DATE_TIME = LocalDateTime.of(2020, 1, 1, 0, 0);
> >
> > Table transactions =
> > tEnv.fromValues(
> > DataTypes.ROW(
> > DataTypes.FIELD("account_id", DataTypes.BIGINT()),
> > DataTypes.FIELD("amount", DataTypes.BIGINT()),
> > DataTypes.FIELD("transaction_time",
> > DataTypes.TIMESTAMP(3))),
> > Row.of(1, 188, DATE_TIME.plusMinutes(12)),
> > Row.of(2, 374, DATE_TIME.plusMinutes(47)),
> > Row.of(3, 112, DATE_TIME.plusMinutes(36)),
> > Row.of(4, 478, DATE_TIME.plusMinutes(3)),
> > Row.of(5, 208, DATE_TIME.plusMinutes(8)),
> > Row.of(1, 379, DATE_TIME.plusMinutes(53)),
> > Row.of(2, 351, DATE_TIME.plusMinutes(32)),
> > Row.of(3, 320, DATE_TIME.plusMinutes(31)),
> > Row.of(4, 259, DATE_TIME.plusMinutes(19)),
> > Row.of(5, 273, DATE_TIME.plusMinutes(42)));
> > tEnv.executeSql("CREATE TABLE print_table(account_id BIGINT, amount
> > BIGINT, transaction_time TIMESTAMP) WITH ('connector' = 'print')");
> >
> > transactions.executeInsert("print_table");
> >
> >
> > I can "materialize" the result manually and print them out with :
> >
> > for (Row row : materialize(transactions.execute())) {
> > System.out.println(row);
> > }
> >
> > private static List<Row> materialize(TableResult results) {
> > try (CloseableIterator<Row> resultIterator = results.collect()) {
> > return StreamSupport
> >
> > .stream(Spliterators.spliteratorUnknownSize(resultIterator,
> > Spliterator.ORDERED), false)
> > .collect(Collectors.toList());
> > } catch (Exception e) {
> > throw new RuntimeException("Failed to materialize results", e);
> > }
> > }
> >
> >
> > But I would like to know why I can't just use the Print sink.
> >
> > I've tried with `.inBatchMode()` and with `inStreamingMode()`, so I
> > don't thinks it's that.
> >
> > Does anybody know of any working example involving the print connector?
> >
> >
> >
> > [1]: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/print.html
>
--
/Rubén
Re: Table Print SQL Connector
Posted by Dawid Wysakowicz <dw...@apache.org>.
You should be able to use the "print" sink. Remember though that the
"print" sink prints into the stdout/stderr of TaskManagers, not the
Client, where you submit the query. This is different from the
TableResult, which collects results in the client. BTW, for printing you
can use TableResult#print, which will nicely format your results.
Best,
Dawid
On 29/10/2020 16:13, Ruben Laguna wrote:
> How can I use the Table [Print SQL connector][1]? I tried the
> following (batch mode) but it does not give any output:
>
>
> EnvironmentSettings settings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
> TableEnvironment tEnv = TableEnvironment.create(settings);
>
> final LocalDateTime DATE_TIME = LocalDateTime.of(2020, 1, 1, 0, 0);
>
> Table transactions =
> tEnv.fromValues(
> DataTypes.ROW(
> DataTypes.FIELD("account_id", DataTypes.BIGINT()),
> DataTypes.FIELD("amount", DataTypes.BIGINT()),
> DataTypes.FIELD("transaction_time",
> DataTypes.TIMESTAMP(3))),
> Row.of(1, 188, DATE_TIME.plusMinutes(12)),
> Row.of(2, 374, DATE_TIME.plusMinutes(47)),
> Row.of(3, 112, DATE_TIME.plusMinutes(36)),
> Row.of(4, 478, DATE_TIME.plusMinutes(3)),
> Row.of(5, 208, DATE_TIME.plusMinutes(8)),
> Row.of(1, 379, DATE_TIME.plusMinutes(53)),
> Row.of(2, 351, DATE_TIME.plusMinutes(32)),
> Row.of(3, 320, DATE_TIME.plusMinutes(31)),
> Row.of(4, 259, DATE_TIME.plusMinutes(19)),
> Row.of(5, 273, DATE_TIME.plusMinutes(42)));
> tEnv.executeSql("CREATE TABLE print_table(account_id BIGINT, amount
> BIGINT, transaction_time TIMESTAMP) WITH ('connector' = 'print')");
>
> transactions.executeInsert("print_table");
>
>
> I can "materialize" the result manually and print them out with :
>
> for (Row row : materialize(transactions.execute())) {
> System.out.println(row);
> }
>
> private static List<Row> materialize(TableResult results) {
> try (CloseableIterator<Row> resultIterator = results.collect()) {
> return StreamSupport
>
> .stream(Spliterators.spliteratorUnknownSize(resultIterator,
> Spliterator.ORDERED), false)
> .collect(Collectors.toList());
> } catch (Exception e) {
> throw new RuntimeException("Failed to materialize results", e);
> }
> }
>
>
> But I would like to know why I can't just use the Print sink.
>
> I've tried with `.inBatchMode()` and with `inStreamingMode()`, so I
> don't thinks it's that.
>
> Does anybody know of any working example involving the print connector?
>
>
>
> [1]: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/print.html