You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Hao Sun <ha...@zendesk.com> on 2017/09/26 03:49:36 UTC

CheckpointConfig says to persist periodic checkpoints, but no checkpoint directory has been configured.

Hi I am running flink in dev mode through Intellij, I have flink-conf.yaml
correctly configured and from the log you can see job manager is reading it.

2017-09-25 20:41:52.255 [main] INFO
 org.apache.flink.configuration.GlobalConfiguration  - *Loading
configuration property: state.backend, rocksdb*
2017-09-25 20:41:52.256 [main] INFO
 org.apache.flink.configuration.GlobalConfiguration  - Loading
configuration property: state.backend.fs.checkpointdir,
/tmp/flink/checkpoints/
2017-09-25 20:41:52.256 [main] INFO
 org.apache.flink.configuration.GlobalConfiguration  - *Loading
configuration property: state.checkpoints.dir, /tmp/flink/checkpoints-meta/*
2017-09-25 20:41:52.256 [main] INFO
 org.apache.flink.configuration.GlobalConfiguration  - Loading
configuration property: state.savepoints.dir, /tmp/flink/savepoints/

*But I still somehow get this error*
java.lang.IllegalStateException: CheckpointConfig says to persist periodic
checkpoints, but no checkpoint directory has been configured. You can
configure configure one via key 'state.checkpoints.dir'.
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:209)
at
org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:451)
at
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:278)
at org.apache.flink.runtime.jobmanager.JobManager.org
$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1315)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:495)
at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


*My program only has this related to checkpointing*

val env = StreamExecutionEnvironment.*getExecutionEnvironment
*env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)env.enableCheckpointing(2
* 60 * 1000)


Need some help to dig through this. Thanks

=================== Full log =================

2017-09-25 20:41:51.466 [ForkJoinPool-1-worker-13] INFO
 com.zendesk.consul.Consul  - Collecting kafka nodes from
Consul(consul.docker:8500) for tags=List(dev)
2017-09-25 20:41:51.946 [main] INFO
 org.apache.flink.api.java.typeutils.TypeExtractor  - class
com.zendesk.fraud_prevention.data_types.MaxwellFilterEvents does not
contain a setter for field events
2017-09-25 20:41:51.946 [main] INFO
 org.apache.flink.api.java.typeutils.TypeExtractor  - class
com.zendesk.fraud_prevention.data_types.MaxwellFilterEvents is not a valid
POJO type because not all fields are valid POJO fields.
2017-09-25 20:41:51.985 [main] INFO
 org.apache.flink.api.java.typeutils.TypeExtractor  - class
com.zendesk.fraud_prevention.data_types.MaxwellFPSEvent does not contain a
setter for field accountId
2017-09-25 20:41:51.985 [main] INFO
 org.apache.flink.api.java.typeutils.TypeExtractor  - class
com.zendesk.fraud_prevention.data_types.MaxwellFPSEvent is not a valid POJO
type because not all fields are valid POJO fields.
2017-09-25 20:41:52.017 [ForkJoinPool-1-worker-13] INFO
 com.zendesk.consul.Consul  - Collecting kafka nodes from
Consul(consul.docker:8500) for tags=List(dev)
2017-09-25 20:41:52.198 [main] INFO
 o.a.flink.streaming.api.environment.LocalStreamEnvironment  - Running job
on local embedded Flink mini cluster
2017-09-25 20:41:52.253 [main] INFO
 org.apache.flink.configuration.GlobalConfiguration  - Loading
configuration property: jobmanager.rpc.address, localhost
2017-09-25 20:41:52.255 [main] INFO
 org.apache.flink.configuration.GlobalConfiguration  - Loading
configuration property: jobmanager.rpc.port, 6123
2017-09-25 20:41:52.255 [main] INFO
 org.apache.flink.configuration.GlobalConfiguration  - Loading
configuration property: jobmanager.web.port, 8081
2017-09-25 20:41:52.255 [main] INFO
 org.apache.flink.configuration.GlobalConfiguration  - Loading
configuration property: jobmanager.heap.mb, 1024
2017-09-25 20:41:52.255 [main] INFO
 org.apache.flink.configuration.GlobalConfiguration  - Loading
configuration property: taskmanager.heap.mb, 1024
2017-09-25 20:41:52.255 [main] INFO
 org.apache.flink.configuration.GlobalConfiguration  - Loading
configuration property: taskmanager.memory.preallocate, false
2017-09-25 20:41:52.255 [main] INFO
 org.apache.flink.configuration.GlobalConfiguration  - Loading
configuration property: taskmanager.numberOfTaskSlots, 1
2017-09-25 20:41:52.255 [main] INFO
 org.apache.flink.configuration.GlobalConfiguration  - Loading
configuration property: parallelism.default, 1
2017-09-25 20:41:52.255 [main] INFO
 org.apache.flink.configuration.GlobalConfiguration  - Loading
configuration property: fs.hdfs.hadoopconf, flink/conf
2017-09-25 20:41:52.255 [main] INFO
 org.apache.flink.configuration.GlobalConfiguration  - Loading
configuration property: state.backend, rocksdb
2017-09-25 20:41:52.256 [main] INFO
 org.apache.flink.configuration.GlobalConfiguration  - Loading
configuration property: state.backend.fs.checkpointdir,
/tmp/flink/checkpoints/
2017-09-25 20:41:52.256 [main] INFO
 org.apache.flink.configuration.GlobalConfiguration  - Loading
configuration property: state.checkpoints.dir, /tmp/flink/checkpoints-meta/
2017-09-25 20:41:52.256 [main] INFO
 org.apache.flink.configuration.GlobalConfiguration  - Loading
configuration property: state.savepoints.dir, /tmp/flink/savepoints/
2017-09-25 20:41:52.256 [main] INFO
 org.apache.flink.configuration.GlobalConfiguration  - Loading
configuration property: taskmanager.log.path,
/tmp/flink_logs/flink_console.log
2017-09-25 20:41:52.256 [main] INFO
 org.apache.flink.configuration.GlobalConfiguration  - Loading
configuration property: jobmanager.web.log.path,
/tmp/flink_logs/flink_console.log
2017-09-25 20:41:52.256 [main] INFO
 org.apache.flink.configuration.GlobalConfiguration  - Loading
configuration property: high-availability, zookeeper
2017-09-25 20:41:52.256 [main] INFO
 org.apache.flink.configuration.GlobalConfiguration  - Loading
configuration property: high-availability.zookeeper.quorum, 172.18.0.7:2181
2017-09-25 20:41:52.256 [main] INFO
 org.apache.flink.configuration.GlobalConfiguration  - Loading
configuration property: high-availability.zookeeper.path.root, /flink
2017-09-25 20:41:52.256 [main] INFO
 org.apache.flink.configuration.GlobalConfiguration  - Loading
configuration property: high-availability.zookeeper.path.cluster-id,
/flink_default_ns
2017-09-25 20:41:52.256 [main] INFO
 org.apache.flink.configuration.GlobalConfiguration  - Loading
configuration property: high-availability.zookeeper.storageDir,
/tmp/flink/ha-recovery
2017-09-25 20:41:52.257 [main] INFO
 org.apache.flink.runtime.minicluster.FlinkMiniCluster  - Disabled
queryable state server
2017-09-25 20:41:52.271 [main] INFO
 org.apache.flink.runtime.minicluster.FlinkMiniCluster  - Starting
FlinkMiniCluster.
2017-09-25 20:41:52.442 [flink-akka.actor.default-dispatcher-4] INFO
 akka.event.slf4j.Slf4jLogger  - Slf4jLogger started
2017-09-25 20:41:52.472 [main] INFO
 org.apache.flink.runtime.blob.BlobServer  - Created BLOB server storage
directory
/var/folders/37/6tz6x9x97d19_ltcm_1tl_f00000gp/T/blobStore-af52ff47-49d3-4307-bb4e-016c5e5d8a39
2017-09-25 20:41:52.477 [main] INFO
 org.apache.flink.runtime.blob.BlobServer  - Started BLOB server at
0.0.0.0:56706 - max concurrent requests: 50 - max backlog: 1000
2017-09-25 20:41:52.487 [main] INFO
 org.apache.flink.runtime.metrics.MetricRegistry  - No metrics reporter
configured, no metrics will be exposed/reported.
2017-09-25 20:41:52.496 [flink-akka.actor.default-dispatcher-4] INFO
 org.apache.flink.runtime.jobmanager.MemoryArchivist  - Started memory
archivist akka://flink/user/archive_1
2017-09-25 20:41:52.501 [flink-akka.actor.default-dispatcher-3] INFO
 org.apache.flink.runtime.jobmanager.JobManager  - Starting JobManager at
akka://flink/user/jobmanager_1.
2017-09-25 20:41:52.501 [flink-akka.actor.default-dispatcher-3] INFO
 o.a.f.r.h.nonha.embedded.EmbeddedLeaderService  - Proposing leadership to
contender org.apache.flink.runtime.jobmanager.JobManager@5b4d1f99 @
akka://flink/user/jobmanager_1
2017-09-25 20:41:52.508 [main] INFO
 o.apache.flink.runtime.taskexecutor.TaskManagerConfiguration  - Messages
have a max timeout of 10000 ms
2017-09-25 20:41:52.514 [main] INFO
 org.apache.flink.runtime.taskexecutor.TaskManagerServices  - Temporary
file directory '/var/folders/37/6tz6x9x97d19_ltcm_1tl_f00000gp/T': total
464 GB, usable 61 GB (13.15% usable)
2017-09-25 20:41:52.590 [flink-akka.actor.default-dispatcher-3] INFO
 org.apache.flink.runtime.jobmanager.JobManager  - JobManager
akka://flink/user/jobmanager_1 was granted leadership with leader session
ID Some(d2f1c68f-2982-474f-8b7f-271d3f4e4192).
2017-09-25 20:41:52.592 [flink-akka.actor.default-dispatcher-2] INFO
 o.a.f.r.h.nonha.embedded.EmbeddedLeaderService  - Received confirmation of
leadership for leader akka://flink/user/jobmanager_1 ,
session=d2f1c68f-2982-474f-8b7f-271d3f4e4192
2017-09-25 20:41:52.602 [flink-akka.actor.default-dispatcher-2] INFO
 o.a.f.r.c.standalone.StandaloneResourceManager  - Trying to associate with
JobManager leader akka://flink/user/jobmanager_1
2017-09-25 20:41:52.610 [flink-akka.actor.default-dispatcher-3] INFO
 o.a.f.r.c.standalone.StandaloneResourceManager  - Resource Manager
associating with leading JobManager
Actor[akka://flink/user/jobmanager_1#-1948249729] - leader session
d2f1c68f-2982-474f-8b7f-271d3f4e4192
2017-09-25 20:41:52.899 [main] INFO
 org.apache.flink.runtime.io.network.buffer.NetworkBufferPool  - Allocated
363 MB for network buffer pool (number of memory segments: 11620, bytes per
segment: 32768).
2017-09-25 20:41:52.915 [main] INFO
 org.apache.flink.runtime.io.network.NetworkEnvironment  - Starting the
network environment and its components.
2017-09-25 20:41:52.917 [main] INFO
 org.apache.flink.runtime.taskexecutor.TaskManagerServices  - Limiting
managed memory to 1145 MB, memory will be allocated lazily.
2017-09-25 20:41:52.922 [main] INFO
 org.apache.flink.runtime.io.disk.iomanager.IOManager  - I/O manager uses
directory
/var/folders/37/6tz6x9x97d19_ltcm_1tl_f00000gp/T/flink-io-698c06ae-3d56-4765-b9b8-8a5d8b70ffd6
for spill files.
2017-09-25 20:41:52.923 [main] INFO
 org.apache.flink.runtime.metrics.MetricRegistry  - No metrics reporter
configured, no metrics will be exposed/reported.
2017-09-25 20:41:52.963 [main] INFO
 org.apache.flink.runtime.filecache.FileCache  - User file cache uses
directory
/var/folders/37/6tz6x9x97d19_ltcm_1tl_f00000gp/T/flink-dist-cache-a99429d5-981d-4cd0-9eeb-5d6678c650f0
2017-09-25 20:41:52.973 [flink-akka.actor.default-dispatcher-2] INFO
 org.apache.flink.runtime.filecache.FileCache  - User file cache uses
directory
/var/folders/37/6tz6x9x97d19_ltcm_1tl_f00000gp/T/flink-dist-cache-dd359949-3837-461c-accc-41ebc67a1d8f
2017-09-25 20:41:52.974 [flink-akka.actor.default-dispatcher-2] INFO
 org.apache.flink.runtime.taskmanager.TaskManager  - Starting TaskManager
actor at akka://flink/user/taskmanager_1#1248014944.
2017-09-25 20:41:52.974 [flink-akka.actor.default-dispatcher-2] INFO
 org.apache.flink.runtime.taskmanager.TaskManager  - TaskManager data
connection information: d7308d8350e736f55357e74e04f5c106 @ localhost
(dataPort=-1)
2017-09-25 20:41:52.975 [flink-akka.actor.default-dispatcher-2] INFO
 org.apache.flink.runtime.taskmanager.TaskManager  - TaskManager has 8 task
slot(s).
2017-09-25 20:41:52.976 [flink-akka.actor.default-dispatcher-2] INFO
 org.apache.flink.runtime.taskmanager.TaskManager  - Memory usage stats:
[HEAP: 385/684/3641 MB, NON HEAP: 35/36/-1 MB (used/committed/max)]
2017-09-25 20:41:52.980 [flink-akka.actor.default-dispatcher-2] INFO
 org.apache.flink.runtime.taskmanager.TaskManager  - Trying to register at
JobManager akka://flink/user/jobmanager_1 (attempt 1, timeout: 500
milliseconds)
2017-09-25 20:41:52.982 [flink-akka.actor.default-dispatcher-2] INFO
 o.a.f.r.c.standalone.StandaloneResourceManager  - TaskManager
d7308d8350e736f55357e74e04f5c106 has started.
2017-09-25 20:41:52.984 [flink-akka.actor.default-dispatcher-3] INFO
 org.apache.flink.runtime.instance.InstanceManager  - Registered
TaskManager at localhost (akka://flink/user/taskmanager_1) as
8ad7f698f504d459202cdb8a9d6a9b34. Current number of registered hosts is 1.
Current number of alive task slots is 8.
2017-09-25 20:41:52.987 [flink-akka.actor.default-dispatcher-2] INFO
 org.apache.flink.runtime.taskmanager.TaskManager  - Successful
registration at JobManager (akka://flink/user/jobmanager_1), starting
network stack and library cache.
2017-09-25 20:41:52.989 [flink-akka.actor.default-dispatcher-2] INFO
 org.apache.flink.runtime.taskmanager.TaskManager  - Determined BLOB server
address to be localhost/127.0.0.1:56706. Starting BLOB cache.
2017-09-25 20:41:52.990 [flink-akka.actor.default-dispatcher-2] INFO
 org.apache.flink.runtime.blob.BlobCache  - Created BLOB cache storage
directory
/var/folders/37/6tz6x9x97d19_ltcm_1tl_f00000gp/T/blobStore-196b0de7-b951-4074-8865-a138283e66c1
2017-09-25 20:41:52.999 [flink-akka.actor.default-dispatcher-3] INFO
 org.apache.flink.runtime.client.JobSubmissionClientActor  - Received
SubmitJobAndWait(JobGraph(jobId: 0f0d880310bc9098027c2e4877f999fb)) but
there is no connection to a JobManager yet.
2017-09-25 20:41:52.999 [flink-akka.actor.default-dispatcher-3] INFO
 org.apache.flink.runtime.client.JobSubmissionClientActor  - Received job
Kafka 0.10 Example development (0f0d880310bc9098027c2e4877f999fb).
2017-09-25 20:41:53.000 [flink-akka.actor.default-dispatcher-3] INFO
 org.apache.flink.runtime.client.JobSubmissionClientActor  - Disconnect
from JobManager null.
2017-09-25 20:41:53.003 [flink-akka.actor.default-dispatcher-2] INFO
 org.apache.flink.runtime.client.JobSubmissionClientActor  - Connect to
JobManager Actor[akka://flink/user/jobmanager_1#-1948249729].
2017-09-25 20:41:53.003 [flink-akka.actor.default-dispatcher-2] INFO
 org.apache.flink.runtime.client.JobSubmissionClientActor  - Connected to
JobManager at Actor[akka://flink/user/jobmanager_1#-1948249729] with leader
session id d2f1c68f-2982-474f-8b7f-271d3f4e4192.
2017-09-25 20:41:53.004 [flink-akka.actor.default-dispatcher-2] INFO
 org.apache.flink.runtime.client.JobSubmissionClientActor  - Sending
message to JobManager akka://flink/user/jobmanager_1 to submit job Kafka
0.10 Example development (0f0d880310bc9098027c2e4877f999fb) and wait for
progress
2017-09-25 20:41:53.005 [flink-akka.actor.default-dispatcher-3] INFO
 org.apache.flink.runtime.client.JobSubmissionClientActor  - Upload jar
files to job manager akka://flink/user/jobmanager_1.
2017-09-25 20:41:53.007 [flink-akka.actor.default-dispatcher-3] INFO
 org.apache.flink.runtime.client.JobSubmissionClientActor  - Submit job to
the job manager akka://flink/user/jobmanager_1.
2017-09-25 20:41:53.009 [flink-akka.actor.default-dispatcher-2] INFO
 org.apache.flink.runtime.jobmanager.JobManager  - Submitting job
0f0d880310bc9098027c2e4877f999fb (Kafka 0.10 Example development).
2017-09-25 20:41:53.014 [flink-akka.actor.default-dispatcher-2] INFO
 org.apache.flink.runtime.jobmanager.JobManager  - Using restart strategy
FixedDelayRestartStrategy(maxNumberRestartAttempts=1,
delayBetweenRestartAttempts=10000) for 0f0d880310bc9098027c2e4877f999fb.
2017-09-25 20:41:53.025 [flink-akka.actor.default-dispatcher-2] INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph  - Job recovers via
failover strategy: full graph restart
2017-09-25 20:41:53.055 [flink-akka.actor.default-dispatcher-2] INFO
 org.apache.flink.runtime.jobmanager.JobManager  - Running initialization
on master for job Kafka 0.10 Example development
(0f0d880310bc9098027c2e4877f999fb).
2017-09-25 20:41:53.055 [flink-akka.actor.default-dispatcher-2] INFO
 org.apache.flink.runtime.jobmanager.JobManager  - Successfully ran
initialization on master in 0 ms.
2017-09-25 20:41:53.071 [flink-akka.actor.default-dispatcher-2] INFO
 org.apache.flink.runtime.jobmanager.JobManager  - No state backend has
been configured, using default state backend (Memory / JobManager)
2017-09-25 20:41:53.078 [flink-akka.actor.default-dispatcher-2] ERROR
org.apache.flink.runtime.jobmanager.JobManager  - Failed to submit job
0f0d880310bc9098027c2e4877f999fb (Kafka 0.10 Example development)
java.lang.IllegalStateException: CheckpointConfig says to persist periodic
checkpoints, but no checkpoint directory has been configured. You can
configure configure one via key 'state.checkpoints.dir'.
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:209)
at
org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:451)
at
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:278)
at org.apache.flink.runtime.jobmanager.JobManager.org
$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1315)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:495)
at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2017-09-25 20:41:53.085 [flink-akka.actor.default-dispatcher-3] INFO
 org.apache.flink.runtime.client.JobSubmissionClientActor  - Terminate
JobClientActor.
2017-09-25 20:41:53.085 [flink-akka.actor.default-dispatcher-3] INFO
 org.apache.flink.runtime.client.JobSubmissionClientActor  - Disconnect
from JobManager Actor[akka://flink/user/jobmanager_1#-1948249729].
2017-09-25 20:41:53.086 [main] INFO
 org.apache.flink.runtime.client.JobClient  - Job execution failed
2017-09-25 20:41:53.086 [main] INFO
 org.apache.flink.runtime.minicluster.FlinkMiniCluster  - Stopping
FlinkMiniCluster.
2017-09-25 20:41:53.090 [flink-akka.actor.default-dispatcher-2] INFO
 org.apache.flink.runtime.taskmanager.TaskManager  - Stopping TaskManager
akka://flink/user/taskmanager_1#1248014944.
2017-09-25 20:41:53.090 [flink-akka.actor.default-dispatcher-4] INFO
 org.apache.flink.runtime.jobmanager.JobManager  - Stopping JobManager
akka://flink/user/jobmanager_1.
2017-09-25 20:41:53.091 [flink-akka.actor.default-dispatcher-2] INFO
 org.apache.flink.runtime.taskmanager.TaskManager  - Disassociating from
JobManager
2017-09-25 20:41:53.093 [flink-akka.actor.default-dispatcher-2] INFO
 org.apache.flink.runtime.blob.BlobCache  - Shutting down BlobCache
2017-09-25 20:41:53.103 [flink-akka.actor.default-dispatcher-4] INFO
 org.apache.flink.runtime.blob.BlobServer  - Stopped BLOB server at
0.0.0.0:56706
2017-09-25 20:41:53.104 [flink-akka.actor.default-dispatcher-2] INFO
 org.apache.flink.runtime.io.disk.iomanager.IOManager  - I/O manager
removed spill file directory
/var/folders/37/6tz6x9x97d19_ltcm_1tl_f00000gp/T/flink-io-698c06ae-3d56-4765-b9b8-8a5d8b70ffd6
2017-09-25 20:41:53.104 [flink-akka.actor.default-dispatcher-2] INFO
 org.apache.flink.runtime.io.network.NetworkEnvironment  - Shutting down
the network environment and its components.
2017-09-25 20:41:53.122 [flink-akka.actor.default-dispatcher-2] INFO
 org.apache.flink.runtime.taskmanager.TaskManager  - Task manager
akka://flink/user/taskmanager_1 is completely shut down.

Re: CheckpointConfig says to persist periodic checkpoints, but no checkpoint directory has been configured.

Posted by Hao Sun <ha...@zendesk.com>.
Thanks, I will try that.

On Tue, Sep 26, 2017 at 8:24 AM Aljoscha Krettek <al...@apache.org>
wrote:

> I'm not sure whether the JM is reading it or not. But you can manually set
> the values on the Configuration using the setter methods.
>
>
> On 26. Sep 2017, at 16:58, Hao Sun <ha...@zendesk.com> wrote:
>
> Thanks Aljoscha, I still have questions.
> Do I have to parse the yaml to a Configuration file? If JM is not reading
> the config how is reading it? the thread is [main] from the logs.
> Why JM does not read the config file by default?
>
> def createLocalEnvironment(parallelism: Int = JavaEnv.getDefaultLocalParallelism):
>     StreamExecutionEnvironment = {
>   new StreamExecutionEnvironment(JavaEnv.createLocalEnvironment(parallelism))
> }
>
> @PublicEvolving
> def createLocalEnvironmentWithWebUI(config: Configuration = null): StreamExecutionEnvironment = {
>   val conf: Configuration = if (config == null) new Configuration() else config
>   new StreamExecutionEnvironment(JavaEnv.createLocalEnvironmentWithWebUI(conf))
> }
>
>
> On Tue, Sep 26, 2017 at 6:25 AM Aljoscha Krettek <al...@apache.org>
> wrote:
>
>> Hi,
>>
>> I think the GlobalConfiguration is not necessarily read by the (local)
>> JobManager. You could try using
>> StreamExecutionEnvironment.createLocalEnvironment(int, Configuration) to
>> manually specify a configuration.
>>
>> Best,
>> Aljoscha
>>
>> On 26. Sep 2017, at 05:49, Hao Sun <ha...@zendesk.com> wrote:
>>
>> Hi I am running flink in dev mode through Intellij, I have
>> flink-conf.yaml correctly configured and from the log you can see job
>> manager is reading it.
>>
>> 2017-09-25 20:41:52.255 [main] INFO
>>  org.apache.flink.configuration.GlobalConfiguration  - *Loading
>> configuration property: state.backend, rocksdb*
>> 2017-09-25 20:41:52.256 [main] INFO
>>  org.apache.flink.configuration.GlobalConfiguration  - Loading
>> configuration property: state.backend.fs.checkpointdir,
>> /tmp/flink/checkpoints/
>> 2017-09-25 20:41:52.256 [main] INFO
>>  org.apache.flink.configuration.GlobalConfiguration  - *Loading
>> configuration property: state.checkpoints.dir, /tmp/flink/checkpoints-meta/*
>> 2017-09-25 20:41:52.256 [main] INFO
>>  org.apache.flink.configuration.GlobalConfiguration  - Loading
>> configuration property: state.savepoints.dir, /tmp/flink/savepoints/
>>
>> *But I still somehow get this error*
>> java.lang.IllegalStateException: CheckpointConfig says to persist
>> periodic checkpoints, but no checkpoint directory has been configured. You
>> can configure configure one via key 'state.checkpoints.dir'.
>> at
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:209)
>> at
>> org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:451)
>> at
>> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:278)
>> at org.apache.flink.runtime.jobmanager.JobManager.org
>> <http://org.apache.flink.runtime.jobmanager.jobmanager.org/>
>> $apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1315)
>> at
>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:495)
>> at
>> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>> at
>> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
>> at
>> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>> at
>> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>> at
>> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>> at
>> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
>> at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>> at
>> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>> at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>>
>> *My program only has this related to checkpointing*
>>
>> val env = StreamExecutionEnvironment.*getExecutionEnvironment
>> *env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)env.enableCheckpointing(2 * 60 * 1000)
>>
>>
>> Need some help to dig through this. Thanks
>>
>> =================== Full log =================
>>
>> 2017-09-25 20:41:51.466 [ForkJoinPool-1-worker-13] INFO
>>  com.zendesk.consul.Consul  - Collecting kafka nodes from
>> Consul(consul.docker:8500) for tags=List(dev)
>> 2017-09-25 20:41:51.946 [main] INFO
>>  org.apache.flink.api.java.typeutils.TypeExtractor  - class
>> com.zendesk.fraud_prevention.data_types.MaxwellFilterEvents does not
>> contain a setter for field events
>> 2017-09-25 20:41:51.946 [main] INFO
>>  org.apache.flink.api.java.typeutils.TypeExtractor  - class
>> com.zendesk.fraud_prevention.data_types.MaxwellFilterEvents is not a valid
>> POJO type because not all fields are valid POJO fields.
>> 2017-09-25 20:41:51.985 [main] INFO
>>  org.apache.flink.api.java.typeutils.TypeExtractor  - class
>> com.zendesk.fraud_prevention.data_types.MaxwellFPSEvent does not contain a
>> setter for field accountId
>> 2017-09-25 20:41:51.985 [main] INFO
>>  org.apache.flink.api.java.typeutils.TypeExtractor  - class
>> com.zendesk.fraud_prevention.data_types.MaxwellFPSEvent is not a valid POJO
>> type because not all fields are valid POJO fields.
>> 2017-09-25 20:41:52.017 [ForkJoinPool-1-worker-13] INFO
>>  com.zendesk.consul.Consul  - Collecting kafka nodes from
>> Consul(consul.docker:8500) for tags=List(dev)
>> 2017-09-25 20:41:52.198 [main] INFO
>>  o.a.flink.streaming.api.environment.LocalStreamEnvironment  - Running job
>> on local embedded Flink mini cluster
>> 2017-09-25 20:41:52.253 [main] INFO
>>  org.apache.flink.configuration.GlobalConfiguration  - Loading
>> configuration property: jobmanager.rpc.address, localhost
>> 2017-09-25 20:41:52.255 [main] INFO
>>  org.apache.flink.configuration.GlobalConfiguration  - Loading
>> configuration property: jobmanager.rpc.port, 6123
>> 2017-09-25 20:41:52.255 [main] INFO
>>  org.apache.flink.configuration.GlobalConfiguration  - Loading
>> configuration property: jobmanager.web.port, 8081
>> 2017-09-25 20:41:52.255 [main] INFO
>>  org.apache.flink.configuration.GlobalConfiguration  - Loading
>> configuration property: jobmanager.heap.mb, 1024
>> 2017-09-25 20:41:52.255 [main] INFO
>>  org.apache.flink.configuration.GlobalConfiguration  - Loading
>> configuration property: taskmanager.heap.mb, 1024
>> 2017-09-25 20:41:52.255 [main] INFO
>>  org.apache.flink.configuration.GlobalConfiguration  - Loading
>> configuration property: taskmanager.memory.preallocate, false
>> 2017-09-25 20:41:52.255 [main] INFO
>>  org.apache.flink.configuration.GlobalConfiguration  - Loading
>> configuration property: taskmanager.numberOfTaskSlots, 1
>> 2017-09-25 20:41:52.255 [main] INFO
>>  org.apache.flink.configuration.GlobalConfiguration  - Loading
>> configuration property: parallelism.default, 1
>> 2017-09-25 20:41:52.255 [main] INFO
>>  org.apache.flink.configuration.GlobalConfiguration  - Loading
>> configuration property: fs.hdfs.hadoopconf, flink/conf
>> 2017-09-25 20:41:52.255 [main] INFO
>>  org.apache.flink.configuration.GlobalConfiguration  - Loading
>> configuration property: state.backend, rocksdb
>> 2017-09-25 20:41:52.256 [main] INFO
>>  org.apache.flink.configuration.GlobalConfiguration  - Loading
>> configuration property: state.backend.fs.checkpointdir,
>> /tmp/flink/checkpoints/
>> 2017-09-25 20:41:52.256 [main] INFO
>>  org.apache.flink.configuration.GlobalConfiguration  - Loading
>> configuration property: state.checkpoints.dir, /tmp/flink/checkpoints-meta/
>> 2017-09-25 20:41:52.256 [main] INFO
>>  org.apache.flink.configuration.GlobalConfiguration  - Loading
>> configuration property: state.savepoints.dir, /tmp/flink/savepoints/
>> 2017-09-25 20:41:52.256 [main] INFO
>>  org.apache.flink.configuration.GlobalConfiguration  - Loading
>> configuration property: taskmanager.log.path,
>> /tmp/flink_logs/flink_console.log
>> 2017-09-25 20:41:52.256 [main] INFO
>>  org.apache.flink.configuration.GlobalConfiguration  - Loading
>> configuration property: jobmanager.web.log.path,
>> /tmp/flink_logs/flink_console.log
>> 2017-09-25 20:41:52.256 [main] INFO
>>  org.apache.flink.configuration.GlobalConfiguration  - Loading
>> configuration property: high-availability, zookeeper
>> 2017-09-25 20:41:52.256 [main] INFO
>>  org.apache.flink.configuration.GlobalConfiguration  - Loading
>> configuration property: high-availability.zookeeper.quorum,
>> 172.18.0.7:2181 <http://172.18.0.7:2181/>
>> 2017-09-25 20:41:52.256 [main] INFO
>>  org.apache.flink.configuration.GlobalConfiguration  - Loading
>> configuration property: high-availability.zookeeper.path.root, /flink
>> 2017-09-25 20:41:52.256 [main] INFO
>>  org.apache.flink.configuration.GlobalConfiguration  - Loading
>> configuration property: high-availability.zookeeper.path.cluster-id,
>> /flink_default_ns
>> 2017-09-25 20:41:52.256 [main] INFO
>>  org.apache.flink.configuration.GlobalConfiguration  - Loading
>> configuration property: high-availability.zookeeper.storageDir,
>> /tmp/flink/ha-recovery
>> 2017-09-25 20:41:52.257 [main] INFO
>>  org.apache.flink.runtime.minicluster.FlinkMiniCluster  - Disabled
>> queryable state server
>> 2017-09-25 20:41:52.271 [main] INFO
>>  org.apache.flink.runtime.minicluster.FlinkMiniCluster  - Starting
>> FlinkMiniCluster.
>> 2017-09-25 20:41:52.442 [flink-akka.actor.default-dispatcher-4] INFO
>>  akka.event.slf4j.Slf4jLogger  - Slf4jLogger started
>> 2017-09-25 20:41:52.472 [main] INFO
>>  org.apache.flink.runtime.blob.BlobServer  - Created BLOB server storage
>> directory
>> /var/folders/37/6tz6x9x97d19_ltcm_1tl_f00000gp/T/blobStore-af52ff47-49d3-4307-bb4e-016c5e5d8a39
>> 2017-09-25 20:41:52.477 [main] INFO
>>  org.apache.flink.runtime.blob.BlobServer  - Started BLOB server at
>> 0.0.0.0:56706 <http://0.0.0.0:56706/> - max
>> concurrent requests: 50 - max backlog: 1000
>> 2017-09-25 20:41:52.487 [main] INFO
>>  org.apache.flink.runtime.metrics.MetricRegistry  - No metrics reporter
>> configured, no metrics will be exposed/reported.
>> 2017-09-25 20:41:52.496 [flink-akka.actor.default-dispatcher-4] INFO
>>  org.apache.flink.runtime.jobmanager.MemoryArchivist  - Started memory
>> archivist akka://flink/user/archive_1
>> 2017-09-25 20:41:52.501 [flink-akka.actor.default-dispatcher-3] INFO
>>  org.apache.flink.runtime.jobmanager.JobManager  - Starting JobManager at
>> akka://flink/user/jobmanager_1.
>> 2017-09-25 20:41:52.501 [flink-akka.actor.default-dispatcher-3] INFO
>>  o.a.f.r.h.nonha.embedded.EmbeddedLeaderService  - Proposing leadership to
>> contender org.apache.flink.runtime.jobmanager.JobManager@5b4d1f99 @
>> akka://flink/user/jobmanager_1
>> 2017-09-25 20:41:52.508 [main] INFO
>>  o.apache.flink.runtime.taskexecutor.TaskManagerConfiguration  - Messages
>> have a max timeout of 10000 ms
>> 2017-09-25 20:41:52.514 [main] INFO
>>  org.apache.flink.runtime.taskexecutor.TaskManagerServices  - Temporary
>> file directory '/var/folders/37/6tz6x9x97d19_ltcm_1tl_f00000gp/T': total
>> 464 GB, usable 61 GB (13.15% usable)
>> 2017-09-25 20:41:52.590 [flink-akka.actor.default-dispatcher-3] INFO
>>  org.apache.flink.runtime.jobmanager.JobManager  - JobManager
>> akka://flink/user/jobmanager_1 was granted leadership with leader
>> session ID Some(d2f1c68f-2982-474f-8b7f-271d3f4e4192).
>> 2017-09-25 20:41:52.592 [flink-akka.actor.default-dispatcher-2] INFO
>>  o.a.f.r.h.nonha.embedded.EmbeddedLeaderService  - Received confirmation of
>> leadership for leader akka://flink/user/jobmanager_1 ,
>> session=d2f1c68f-2982-474f-8b7f-271d3f4e4192
>> 2017-09-25 20:41:52.602 [flink-akka.actor.default-dispatcher-2] INFO
>>  o.a.f.r.c.standalone.StandaloneResourceManager  - Trying to associate with
>> JobManager leader akka://flink/user/jobmanager_1
>> 2017-09-25 20:41:52.610 [flink-akka.actor.default-dispatcher-3] INFO
>>  o.a.f.r.c.standalone.StandaloneResourceManager  - Resource Manager
>> associating with leading JobManager Actor[
>> akka://flink/user/jobmanager_1#-1948249729] - leader session
>> d2f1c68f-2982-474f-8b7f-271d3f4e4192
>> 2017-09-25 20:41:52.899 [main] INFO
>>  org.apache.flink.runtime.io.network.buffer.NetworkBufferPool  - Allocated
>> 363 MB for network buffer pool (number of memory segments: 11620, bytes per
>> segment: 32768).
>> 2017-09-25 20:41:52.915 [main] INFO
>>  org.apache.flink.runtime.io.network.NetworkEnvironment  - Starting the
>> network environment and its components.
>> 2017-09-25 20:41:52.917 [main] INFO
>>  org.apache.flink.runtime.taskexecutor.TaskManagerServices  - Limiting
>> managed memory to 1145 MB, memory will be allocated lazily.
>> 2017-09-25 20:41:52.922 [main] INFO
>>  org.apache.flink.runtime.io.disk.iomanager.IOManager  - I/O manager uses
>> directory
>> /var/folders/37/6tz6x9x97d19_ltcm_1tl_f00000gp/T/flink-io-698c06ae-3d56-4765-b9b8-8a5d8b70ffd6
>> for spill files.
>> 2017-09-25 20:41:52.923 [main] INFO
>>  org.apache.flink.runtime.metrics.MetricRegistry  - No metrics reporter
>> configured, no metrics will be exposed/reported.
>> 2017-09-25 20:41:52.963 [main] INFO
>>  org.apache.flink.runtime.filecache.FileCache  - User file cache uses
>> directory
>> /var/folders/37/6tz6x9x97d19_ltcm_1tl_f00000gp/T/flink-dist-cache-a99429d5-981d-4cd0-9eeb-5d6678c650f0
>> 2017-09-25 20:41:52.973 [flink-akka.actor.default-dispatcher-2] INFO
>>  org.apache.flink.runtime.filecache.FileCache  - User file cache uses
>> directory
>> /var/folders/37/6tz6x9x97d19_ltcm_1tl_f00000gp/T/flink-dist-cache-dd359949-3837-461c-accc-41ebc67a1d8f
>> 2017-09-25 20:41:52.974 [flink-akka.actor.default-dispatcher-2] INFO
>>  org.apache.flink.runtime.taskmanager.TaskManager  - Starting TaskManager
>> actor at akka://flink/user/taskmanager_1#1248014944.
>> 2017-09-25 20:41:52.974 [flink-akka.actor.default-dispatcher-2] INFO
>>  org.apache.flink.runtime.taskmanager.TaskManager  - TaskManager data
>> connection information: d7308d8350e736f55357e74e04f5c106 @ localhost
>> (dataPort=-1)
>> 2017-09-25 20:41:52.975 [flink-akka.actor.default-dispatcher-2] INFO
>>  org.apache.flink.runtime.taskmanager.TaskManager  - TaskManager has 8 task
>> slot(s).
>> 2017-09-25 20:41:52.976 [flink-akka.actor.default-dispatcher-2] INFO
>>  org.apache.flink.runtime.taskmanager.TaskManager  - Memory usage stats:
>> [HEAP: 385/684/3641 MB, NON HEAP: 35/36/-1 MB (used/committed/max)]
>> 2017-09-25 20:41:52.980 [flink-akka.actor.default-dispatcher-2] INFO
>>  org.apache.flink.runtime.taskmanager.TaskManager  - Trying to register at
>> JobManager akka://flink/user/jobmanager_1 (attempt 1, timeout: 500
>> milliseconds)
>> 2017-09-25 20:41:52.982 [flink-akka.actor.default-dispatcher-2] INFO
>>  o.a.f.r.c.standalone.StandaloneResourceManager  - TaskManager
>> d7308d8350e736f55357e74e04f5c106 has started.
>> 2017-09-25 20:41:52.984 [flink-akka.actor.default-dispatcher-3] INFO
>>  org.apache.flink.runtime.instance.InstanceManager  - Registered
>> TaskManager at localhost (akka://flink/user/taskmanager_1) as
>> 8ad7f698f504d459202cdb8a9d6a9b34. Current number of registered hosts is 1.
>> Current number of alive task slots is 8.
>> 2017-09-25 20:41:52.987 [flink-akka.actor.default-dispatcher-2] INFO
>>  org.apache.flink.runtime.taskmanager.TaskManager  - Successful
>> registration at JobManager (akka://flink/user/jobmanager_1), starting
>> network stack and library cache.
>> 2017-09-25 20:41:52.989 [flink-akka.actor.default-dispatcher-2] INFO
>>  org.apache.flink.runtime.taskmanager.TaskManager  - Determined BLOB server
>> address to be localhost/127.0.0.1:56706
>> <http://127.0.0.1:56706/>. Starting BLOB cache.
>> 2017-09-25 20:41:52.990 [flink-akka.actor.default-dispatcher-2] INFO
>>  org.apache.flink.runtime.blob.BlobCache  - Created BLOB cache storage
>> directory
>> /var/folders/37/6tz6x9x97d19_ltcm_1tl_f00000gp/T/blobStore-196b0de7-b951-4074-8865-a138283e66c1
>> 2017-09-25 20:41:52.999 [flink-akka.actor.default-dispatcher-3] INFO
>>  org.apache.flink.runtime.client.JobSubmissionClientActor  - Received
>> SubmitJobAndWait(JobGraph(jobId: 0f0d880310bc9098027c2e4877f999fb)) but
>> there is no connection to a JobManager yet.
>> 2017-09-25 20:41:52.999 [flink-akka.actor.default-dispatcher-3] INFO
>>  org.apache.flink.runtime.client.JobSubmissionClientActor  - Received job
>> Kafka 0.10 Example development (0f0d880310bc9098027c2e4877f999fb).
>> 2017-09-25 20:41:53.000 [flink-akka.actor.default-dispatcher-3] INFO
>>  org.apache.flink.runtime.client.JobSubmissionClientActor  - Disconnect
>> from JobManager null.
>> 2017-09-25 20:41:53.003 [flink-akka.actor.default-dispatcher-2] INFO
>>  org.apache.flink.runtime.client.JobSubmissionClientActor  - Connect to
>> JobManager Actor[akka://flink/user/jobmanager_1#-1948249729].
>> 2017-09-25 20:41:53.003 [flink-akka.actor.default-dispatcher-2] INFO
>>  org.apache.flink.runtime.client.JobSubmissionClientActor  - Connected to
>> JobManager at Actor[akka://flink/user/jobmanager_1#-1948249729] with
>> leader session id d2f1c68f-2982-474f-8b7f-271d3f4e4192.
>> 2017-09-25 20:41:53.004 [flink-akka.actor.default-dispatcher-2] INFO
>>  org.apache.flink.runtime.client.JobSubmissionClientActor  - Sending
>> message to JobManager akka://flink/user/jobmanager_1 to submit job Kafka
>> 0.10 Example development (0f0d880310bc9098027c2e4877f999fb) and wait for
>> progress
>> 2017-09-25 20:41:53.005 [flink-akka.actor.default-dispatcher-3] INFO
>>  org.apache.flink.runtime.client.JobSubmissionClientActor  - Upload jar
>> files to job manager akka://flink/user/jobmanager_1.
>> 2017-09-25 20:41:53.007 [flink-akka.actor.default-dispatcher-3] INFO
>>  org.apache.flink.runtime.client.JobSubmissionClientActor  - Submit job to
>> the job manager akka://flink/user/jobmanager_1.
>> 2017-09-25 20:41:53.009 [flink-akka.actor.default-dispatcher-2] INFO
>>  org.apache.flink.runtime.jobmanager.JobManager  - Submitting job
>> 0f0d880310bc9098027c2e4877f999fb (Kafka 0.10 Example development).
>> 2017-09-25 20:41:53.014 [flink-akka.actor.default-dispatcher-2] INFO
>>  org.apache.flink.runtime.jobmanager.JobManager  - Using restart strategy
>> FixedDelayRestartStrategy(maxNumberRestartAttempts=1,
>> delayBetweenRestartAttempts=10000) for 0f0d880310bc9098027c2e4877f999fb.
>> 2017-09-25 20:41:53.025 [flink-akka.actor.default-dispatcher-2] INFO
>>  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Job recovers via
>> failover strategy: full graph restart
>> 2017-09-25 20:41:53.055 [flink-akka.actor.default-dispatcher-2] INFO
>>  org.apache.flink.runtime.jobmanager.JobManager  - Running initialization
>> on master for job Kafka 0.10 Example development
>> (0f0d880310bc9098027c2e4877f999fb).
>> 2017-09-25 20:41:53.055 [flink-akka.actor.default-dispatcher-2] INFO
>>  org.apache.flink.runtime.jobmanager.JobManager  - Successfully ran
>> initialization on master in 0 ms.
>> 2017-09-25 20:41:53.071 [flink-akka.actor.default-dispatcher-2] INFO
>>  org.apache.flink.runtime.jobmanager.JobManager  - No state backend has
>> been configured, using default state backend (Memory / JobManager)
>> 2017-09-25 20:41:53.078 [flink-akka.actor.default-dispatcher-2] ERROR
>> org.apache.flink.runtime.jobmanager.JobManager  - Failed to submit job
>> 0f0d880310bc9098027c2e4877f999fb (Kafka 0.10 Example development)
>> java.lang.IllegalStateException: CheckpointConfig says to persist
>> periodic checkpoints, but no checkpoint directory has been configured. You
>> can configure configure one via key 'state.checkpoints.dir'.
>> at
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:209)
>> at
>> org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:451)
>> at
>> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:278)
>> at org.apache.flink.runtime.jobmanager.JobManager.org
>> <http://org.apache.flink.runtime.jobmanager.jobmanager.org/>
>> $apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1315)
>> at
>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:495)
>> at
>> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>> at
>> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
>> at
>> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>> at
>> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>> at
>> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>> at
>> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
>> at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>> at
>> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>> at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> 2017-09-25 20:41:53.085 [flink-akka.actor.default-dispatcher-3] INFO
>>  org.apache.flink.runtime.client.JobSubmissionClientActor  - Terminate
>> JobClientActor.
>> 2017-09-25 20:41:53.085 [flink-akka.actor.default-dispatcher-3] INFO
>>  org.apache.flink.runtime.client.JobSubmissionClientActor  - Disconnect
>> from JobManager Actor[akka://flink/user/jobmanager_1#-1948249729].
>> 2017-09-25 20:41:53.086 [main] INFO
>>  org.apache.flink.runtime.client.JobClient  - Job execution failed
>> 2017-09-25 20:41:53.086 [main] INFO
>>  org.apache.flink.runtime.minicluster.FlinkMiniCluster  - Stopping
>> FlinkMiniCluster.
>> 2017-09-25 20:41:53.090 [flink-akka.actor.default-dispatcher-2] INFO
>>  org.apache.flink.runtime.taskmanager.TaskManager  - Stopping TaskManager
>> akka://flink/user/taskmanager_1#1248014944.
>> 2017-09-25 20:41:53.090 [flink-akka.actor.default-dispatcher-4] INFO
>>  org.apache.flink.runtime.jobmanager.JobManager  - Stopping JobManager
>> akka://flink/user/jobmanager_1.
>> 2017-09-25 20:41:53.091 [flink-akka.actor.default-dispatcher-2] INFO
>>  org.apache.flink.runtime.taskmanager.TaskManager  - Disassociating from
>> JobManager
>> 2017-09-25 20:41:53.093 [flink-akka.actor.default-dispatcher-2] INFO
>>  org.apache.flink.runtime.blob.BlobCache  - Shutting down BlobCache
>> 2017-09-25 20:41:53.103 [flink-akka.actor.default-dispatcher-4] INFO
>>  org.apache.flink.runtime.blob.BlobServer  - Stopped BLOB server at
>> 0.0.0.0:56706 <http://0.0.0.0:56706/>
>> 2017-09-25 20:41:53.104 [flink-akka.actor.default-dispatcher-2] INFO
>>  org.apache.flink.runtime.io.disk.iomanager.IOManager  - I/O manager
>> removed spill file directory
>> /var/folders/37/6tz6x9x97d19_ltcm_1tl_f00000gp/T/flink-io-698c06ae-3d56-4765-b9b8-8a5d8b70ffd6
>> 2017-09-25 20:41:53.104 [flink-akka.actor.default-dispatcher-2] INFO
>>  org.apache.flink.runtime.io.network.NetworkEnvironment  - Shutting down
>> the network environment and its components.
>> 2017-09-25 20:41:53.122 [flink-akka.actor.default-dispatcher-2] INFO
>>  org.apache.flink.runtime.taskmanager.TaskManager  - Task manager
>> akka://flink/user/taskmanager_1 is completely shut down.
>>
>>
>>
>

Re: CheckpointConfig says to persist periodic checkpoints, but no checkpoint directory has been configured.

Posted by Aljoscha Krettek <al...@apache.org>.
I'm not sure whether the JM is reading it or not. But you can manually set the values on the Configuration using the setter methods.


> On 26. Sep 2017, at 16:58, Hao Sun <ha...@zendesk.com> wrote:
> 
> Thanks Aljoscha, I still have questions.
> Do I have to parse the yaml to a Configuration file? If JM is not reading the config how is reading it? the thread is [main] from the logs.
> Why JM does not read the config file by default?
> def createLocalEnvironment(parallelism: Int = JavaEnv.getDefaultLocalParallelism):
>     StreamExecutionEnvironment = {
>   new StreamExecutionEnvironment(JavaEnv.createLocalEnvironment(parallelism))
> }
> 
> @PublicEvolving
> def createLocalEnvironmentWithWebUI(config: Configuration = null): StreamExecutionEnvironment = {
>   val conf: Configuration = if (config == null) new Configuration() else config
>   new StreamExecutionEnvironment(JavaEnv.createLocalEnvironmentWithWebUI(conf))
> }
> 
> On Tue, Sep 26, 2017 at 6:25 AM Aljoscha Krettek <aljoscha@apache.org <ma...@apache.org>> wrote:
> Hi,
> 
> I think the GlobalConfiguration is not necessarily read by the (local) JobManager. You could try using StreamExecutionEnvironment.createLocalEnvironment(int, Configuration) to manually specify a configuration.
> 
> Best,
> Aljoscha
> 
>> On 26. Sep 2017, at 05:49, Hao Sun <hasun@zendesk.com <ma...@zendesk.com>> wrote:
>> 
>> Hi I am running flink in dev mode through Intellij, I have flink-conf.yaml correctly configured and from the log you can see job manager is reading it.
>> 
>> 2017-09-25 20:41:52.255 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: state.backend, rocksdb
>> 2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: state.backend.fs.checkpointdir, /tmp/flink/checkpoints/
>> 2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: state.checkpoints.dir, /tmp/flink/checkpoints-meta/
>> 2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: state.savepoints.dir, /tmp/flink/savepoints/
>> 
>> But I still somehow get this error
>> java.lang.IllegalStateException: CheckpointConfig says to persist periodic checkpoints, but no checkpoint directory has been configured. You can configure configure one via key 'state.checkpoints.dir'.
>> 	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:209)
>> 	at org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:451)
>> 	at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:278)
>> 	at org.apache.flink.runtime.jobmanager.JobManager.org <http://org.apache.flink.runtime.jobmanager.jobmanager.org/>$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1315)
>> 	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:495)
>> 	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>> 	at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
>> 	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>> 	at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>> 	at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>> 	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>> 	at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
>> 	at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>> 	at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125)
>> 	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>> 	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>> 	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>> 	at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>> 	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>> 	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> 	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> 	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> 	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> 
>> 
>> My program only has this related to checkpointing
>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>> env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
>> env.enableCheckpointing(2 * 60 * 1000)
>> 
>> Need some help to dig through this. Thanks
>> 
>> =================== Full log =================
>> 
>> 2017-09-25 20:41:51.466 [ForkJoinPool-1-worker-13] INFO  com.zendesk.consul.Consul  - Collecting kafka nodes from Consul(consul.docker:8500) for tags=List(dev)
>> 2017-09-25 20:41:51.946 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - class com.zendesk.fraud_prevention.data_types.MaxwellFilterEvents does not contain a setter for field events
>> 2017-09-25 20:41:51.946 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - class com.zendesk.fraud_prevention.data_types.MaxwellFilterEvents is not a valid POJO type because not all fields are valid POJO fields.
>> 2017-09-25 20:41:51.985 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - class com.zendesk.fraud_prevention.data_types.MaxwellFPSEvent does not contain a setter for field accountId
>> 2017-09-25 20:41:51.985 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - class com.zendesk.fraud_prevention.data_types.MaxwellFPSEvent is not a valid POJO type because not all fields are valid POJO fields.
>> 2017-09-25 20:41:52.017 [ForkJoinPool-1-worker-13] INFO  com.zendesk.consul.Consul  - Collecting kafka nodes from Consul(consul.docker:8500) for tags=List(dev)
>> 2017-09-25 20:41:52.198 [main] INFO  o.a.flink.streaming.api.environment.LocalStreamEnvironment  - Running job on local embedded Flink mini cluster
>> 2017-09-25 20:41:52.253 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: jobmanager.rpc.address, localhost
>> 2017-09-25 20:41:52.255 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: jobmanager.rpc.port, 6123
>> 2017-09-25 20:41:52.255 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: jobmanager.web.port, 8081
>> 2017-09-25 20:41:52.255 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: jobmanager.heap.mb, 1024
>> 2017-09-25 20:41:52.255 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: taskmanager.heap.mb, 1024
>> 2017-09-25 20:41:52.255 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: taskmanager.memory.preallocate, false
>> 2017-09-25 20:41:52.255 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: taskmanager.numberOfTaskSlots, 1
>> 2017-09-25 20:41:52.255 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: parallelism.default, 1
>> 2017-09-25 20:41:52.255 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: fs.hdfs.hadoopconf, flink/conf
>> 2017-09-25 20:41:52.255 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: state.backend, rocksdb
>> 2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: state.backend.fs.checkpointdir, /tmp/flink/checkpoints/
>> 2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: state.checkpoints.dir, /tmp/flink/checkpoints-meta/
>> 2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: state.savepoints.dir, /tmp/flink/savepoints/
>> 2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: taskmanager.log.path, /tmp/flink_logs/flink_console.log
>> 2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: jobmanager.web.log.path, /tmp/flink_logs/flink_console.log
>> 2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: high-availability, zookeeper
>> 2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: high-availability.zookeeper.quorum, 172.18.0.7:2181 <http://172.18.0.7:2181/>
>> 2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: high-availability.zookeeper.path.root, /flink
>> 2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: high-availability.zookeeper.path.cluster-id, /flink_default_ns
>> 2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: high-availability.zookeeper.storageDir, /tmp/flink/ha-recovery
>> 2017-09-25 20:41:52.257 [main] INFO  org.apache.flink.runtime.minicluster.FlinkMiniCluster  - Disabled queryable state server
>> 2017-09-25 20:41:52.271 [main] INFO  org.apache.flink.runtime.minicluster.FlinkMiniCluster  - Starting FlinkMiniCluster.
>> 2017-09-25 20:41:52.442 [flink-akka.actor.default-dispatcher-4] INFO  akka.event.slf4j.Slf4jLogger  - Slf4jLogger started
>> 2017-09-25 20:41:52.472 [main] INFO  org.apache.flink.runtime.blob.BlobServer  - Created BLOB server storage directory /var/folders/37/6tz6x9x97d19_ltcm_1tl_f00000gp/T/blobStore-af52ff47-49d3-4307-bb4e-016c5e5d8a39
>> 2017-09-25 20:41:52.477 [main] INFO  org.apache.flink.runtime.blob.BlobServer  - Started BLOB server at 0.0.0.0:56706 <http://0.0.0.0:56706/> - max concurrent requests: 50 - max backlog: 1000
>> 2017-09-25 20:41:52.487 [main] INFO  org.apache.flink.runtime.metrics.MetricRegistry  - No metrics reporter configured, no metrics will be exposed/reported.
>> 2017-09-25 20:41:52.496 [flink-akka.actor.default-dispatcher-4] INFO  org.apache.flink.runtime.jobmanager.MemoryArchivist  - Started memory archivist akka://flink/user/archive_1 <>
>> 2017-09-25 20:41:52.501 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.jobmanager.JobManager  - Starting JobManager at akka://flink/user/jobmanager_1 <>.
>> 2017-09-25 20:41:52.501 [flink-akka.actor.default-dispatcher-3] INFO  o.a.f.r.h.nonha.embedded.EmbeddedLeaderService  - Proposing leadership to contender org.apache.flink.runtime.jobmanager.JobManager@5b4d1f99 @ akka://flink/user/jobmanager_1 <>
>> 2017-09-25 20:41:52.508 [main] INFO  o.apache.flink.runtime.taskexecutor.TaskManagerConfiguration  - Messages have a max timeout of 10000 ms
>> 2017-09-25 20:41:52.514 [main] INFO  org.apache.flink.runtime.taskexecutor.TaskManagerServices  - Temporary file directory '/var/folders/37/6tz6x9x97d19_ltcm_1tl_f00000gp/T': total 464 GB, usable 61 GB (13.15% usable)
>> 2017-09-25 20:41:52.590 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.jobmanager.JobManager  - JobManager akka://flink/user/jobmanager_1 <> was granted leadership with leader session ID Some(d2f1c68f-2982-474f-8b7f-271d3f4e4192).
>> 2017-09-25 20:41:52.592 [flink-akka.actor.default-dispatcher-2] INFO  o.a.f.r.h.nonha.embedded.EmbeddedLeaderService  - Received confirmation of leadership for leader akka://flink/user/jobmanager_1 <> , session=d2f1c68f-2982-474f-8b7f-271d3f4e4192
>> 2017-09-25 20:41:52.602 [flink-akka.actor.default-dispatcher-2] INFO  o.a.f.r.c.standalone.StandaloneResourceManager  - Trying to associate with JobManager leader akka://flink/user/jobmanager_1 <>
>> 2017-09-25 20:41:52.610 [flink-akka.actor.default-dispatcher-3] INFO  o.a.f.r.c.standalone.StandaloneResourceManager  - Resource Manager associating with leading JobManager Actor[akka://flink/user/jobmanager_1#-1948249729 <>] - leader session d2f1c68f-2982-474f-8b7f-271d3f4e4192
>> 2017-09-25 20:41:52.899 [main] INFO  org.apache.flink.runtime.io.network.buffer.NetworkBufferPool  - Allocated 363 MB for network buffer pool (number of memory segments: 11620, bytes per segment: 32768).
>> 2017-09-25 20:41:52.915 [main] INFO  org.apache.flink.runtime.io.network.NetworkEnvironment  - Starting the network environment and its components.
>> 2017-09-25 20:41:52.917 [main] INFO  org.apache.flink.runtime.taskexecutor.TaskManagerServices  - Limiting managed memory to 1145 MB, memory will be allocated lazily.
>> 2017-09-25 20:41:52.922 [main] INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager  - I/O manager uses directory /var/folders/37/6tz6x9x97d19_ltcm_1tl_f00000gp/T/flink-io-698c06ae-3d56-4765-b9b8-8a5d8b70ffd6 for spill files.
>> 2017-09-25 20:41:52.923 [main] INFO  org.apache.flink.runtime.metrics.MetricRegistry  - No metrics reporter configured, no metrics will be exposed/reported.
>> 2017-09-25 20:41:52.963 [main] INFO  org.apache.flink.runtime.filecache.FileCache  - User file cache uses directory /var/folders/37/6tz6x9x97d19_ltcm_1tl_f00000gp/T/flink-dist-cache-a99429d5-981d-4cd0-9eeb-5d6678c650f0
>> 2017-09-25 20:41:52.973 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.filecache.FileCache  - User file cache uses directory /var/folders/37/6tz6x9x97d19_ltcm_1tl_f00000gp/T/flink-dist-cache-dd359949-3837-461c-accc-41ebc67a1d8f
>> 2017-09-25 20:41:52.974 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskmanager.TaskManager  - Starting TaskManager actor at akka://flink/user/taskmanager_1#1248014944 <>.
>> 2017-09-25 20:41:52.974 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskmanager.TaskManager  - TaskManager data connection information: d7308d8350e736f55357e74e04f5c106 @ localhost (dataPort=-1)
>> 2017-09-25 20:41:52.975 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskmanager.TaskManager  - TaskManager has 8 task slot(s).
>> 2017-09-25 20:41:52.976 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskmanager.TaskManager  - Memory usage stats: [HEAP: 385/684/3641 MB, NON HEAP: 35/36/-1 MB (used/committed/max)]
>> 2017-09-25 20:41:52.980 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskmanager.TaskManager  - Trying to register at JobManager akka://flink/user/jobmanager_1 <> (attempt 1, timeout: 500 milliseconds)
>> 2017-09-25 20:41:52.982 [flink-akka.actor.default-dispatcher-2] INFO  o.a.f.r.c.standalone.StandaloneResourceManager  - TaskManager d7308d8350e736f55357e74e04f5c106 has started.
>> 2017-09-25 20:41:52.984 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.instance.InstanceManager  - Registered TaskManager at localhost (akka://flink/user/taskmanager_1 <>) as 8ad7f698f504d459202cdb8a9d6a9b34. Current number of registered hosts is 1. Current number of alive task slots is 8.
>> 2017-09-25 20:41:52.987 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskmanager.TaskManager  - Successful registration at JobManager (akka://flink/user/jobmanager_1 <>), starting network stack and library cache.
>> 2017-09-25 20:41:52.989 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskmanager.TaskManager  - Determined BLOB server address to be localhost/127.0.0.1:56706 <http://127.0.0.1:56706/>. Starting BLOB cache.
>> 2017-09-25 20:41:52.990 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.blob.BlobCache  - Created BLOB cache storage directory /var/folders/37/6tz6x9x97d19_ltcm_1tl_f00000gp/T/blobStore-196b0de7-b951-4074-8865-a138283e66c1
>> 2017-09-25 20:41:52.999 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.client.JobSubmissionClientActor  - Received SubmitJobAndWait(JobGraph(jobId: 0f0d880310bc9098027c2e4877f999fb)) but there is no connection to a JobManager yet.
>> 2017-09-25 20:41:52.999 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.client.JobSubmissionClientActor  - Received job Kafka 0.10 Example development (0f0d880310bc9098027c2e4877f999fb).
>> 2017-09-25 20:41:53.000 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.client.JobSubmissionClientActor  - Disconnect from JobManager null.
>> 2017-09-25 20:41:53.003 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.client.JobSubmissionClientActor  - Connect to JobManager Actor[akka://flink/user/jobmanager_1#-1948249729 <>].
>> 2017-09-25 20:41:53.003 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.client.JobSubmissionClientActor  - Connected to JobManager at Actor[akka://flink/user/jobmanager_1#-1948249729 <>] with leader session id d2f1c68f-2982-474f-8b7f-271d3f4e4192.
>> 2017-09-25 20:41:53.004 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.client.JobSubmissionClientActor  - Sending message to JobManager akka://flink/user/jobmanager_1 <> to submit job Kafka 0.10 Example development (0f0d880310bc9098027c2e4877f999fb) and wait for progress
>> 2017-09-25 20:41:53.005 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.client.JobSubmissionClientActor  - Upload jar files to job manager akka://flink/user/jobmanager_1 <>.
>> 2017-09-25 20:41:53.007 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.client.JobSubmissionClientActor  - Submit job to the job manager akka://flink/user/jobmanager_1 <>.
>> 2017-09-25 20:41:53.009 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.jobmanager.JobManager  - Submitting job 0f0d880310bc9098027c2e4877f999fb (Kafka 0.10 Example development).
>> 2017-09-25 20:41:53.014 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.jobmanager.JobManager  - Using restart strategy FixedDelayRestartStrategy(maxNumberRestartAttempts=1, delayBetweenRestartAttempts=10000) for 0f0d880310bc9098027c2e4877f999fb.
>> 2017-09-25 20:41:53.025 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Job recovers via failover strategy: full graph restart
>> 2017-09-25 20:41:53.055 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.jobmanager.JobManager  - Running initialization on master for job Kafka 0.10 Example development (0f0d880310bc9098027c2e4877f999fb).
>> 2017-09-25 20:41:53.055 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.jobmanager.JobManager  - Successfully ran initialization on master in 0 ms.
>> 2017-09-25 20:41:53.071 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.jobmanager.JobManager  - No state backend has been configured, using default state backend (Memory / JobManager)
>> 2017-09-25 20:41:53.078 [flink-akka.actor.default-dispatcher-2] ERROR org.apache.flink.runtime.jobmanager.JobManager  - Failed to submit job 0f0d880310bc9098027c2e4877f999fb (Kafka 0.10 Example development)
>> java.lang.IllegalStateException: CheckpointConfig says to persist periodic checkpoints, but no checkpoint directory has been configured. You can configure configure one via key 'state.checkpoints.dir'.
>> 	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:209)
>> 	at org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:451)
>> 	at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:278)
>> 	at org.apache.flink.runtime.jobmanager.JobManager.org <http://org.apache.flink.runtime.jobmanager.jobmanager.org/>$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1315)
>> 	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:495)
>> 	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>> 	at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
>> 	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>> 	at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>> 	at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>> 	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>> 	at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
>> 	at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>> 	at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125)
>> 	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>> 	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>> 	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>> 	at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>> 	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>> 	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> 	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> 	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> 	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> 2017-09-25 20:41:53.085 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.client.JobSubmissionClientActor  - Terminate JobClientActor.
>> 2017-09-25 20:41:53.085 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.client.JobSubmissionClientActor  - Disconnect from JobManager Actor[akka://flink/user/jobmanager_1#-1948249729 <>].
>> 2017-09-25 20:41:53.086 [main] INFO  org.apache.flink.runtime.client.JobClient  - Job execution failed
>> 2017-09-25 20:41:53.086 [main] INFO  org.apache.flink.runtime.minicluster.FlinkMiniCluster  - Stopping FlinkMiniCluster.
>> 2017-09-25 20:41:53.090 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskmanager.TaskManager  - Stopping TaskManager akka://flink/user/taskmanager_1#1248014944 <>.
>> 2017-09-25 20:41:53.090 [flink-akka.actor.default-dispatcher-4] INFO  org.apache.flink.runtime.jobmanager.JobManager  - Stopping JobManager akka://flink/user/jobmanager_1 <>.
>> 2017-09-25 20:41:53.091 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskmanager.TaskManager  - Disassociating from JobManager
>> 2017-09-25 20:41:53.093 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.blob.BlobCache  - Shutting down BlobCache
>> 2017-09-25 20:41:53.103 [flink-akka.actor.default-dispatcher-4] INFO  org.apache.flink.runtime.blob.BlobServer  - Stopped BLOB server at 0.0.0.0:56706 <http://0.0.0.0:56706/>
>> 2017-09-25 20:41:53.104 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager  - I/O manager removed spill file directory /var/folders/37/6tz6x9x97d19_ltcm_1tl_f00000gp/T/flink-io-698c06ae-3d56-4765-b9b8-8a5d8b70ffd6
>> 2017-09-25 20:41:53.104 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.io.network.NetworkEnvironment  - Shutting down the network environment and its components.
>> 2017-09-25 20:41:53.122 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskmanager.TaskManager  - Task manager akka://flink/user/taskmanager_1 <> is completely shut down.
> 


Re: CheckpointConfig says to persist periodic checkpoints, but no checkpoint directory has been configured.

Posted by Hao Sun <ha...@zendesk.com>.
Thanks Aljoscha, I still have questions.
Do I have to parse the yaml to a Configuration file? If JM is not reading
the config how is reading it? the thread is [main] from the logs.
Why JM does not read the config file by default?

def createLocalEnvironment(parallelism: Int =
JavaEnv.getDefaultLocalParallelism):
    StreamExecutionEnvironment = {
  new StreamExecutionEnvironment(JavaEnv.createLocalEnvironment(parallelism))
}

@PublicEvolving
def createLocalEnvironmentWithWebUI(config: Configuration = null):
StreamExecutionEnvironment = {
  val conf: Configuration = if (config == null) new Configuration() else config
  new StreamExecutionEnvironment(JavaEnv.createLocalEnvironmentWithWebUI(conf))
}


On Tue, Sep 26, 2017 at 6:25 AM Aljoscha Krettek <al...@apache.org>
wrote:

> Hi,
>
> I think the GlobalConfiguration is not necessarily read by the (local)
> JobManager. You could try using
> StreamExecutionEnvironment.createLocalEnvironment(int, Configuration) to
> manually specify a configuration.
>
> Best,
> Aljoscha
>
> On 26. Sep 2017, at 05:49, Hao Sun <ha...@zendesk.com> wrote:
>
> Hi I am running flink in dev mode through Intellij, I have flink-conf.yaml
> correctly configured and from the log you can see job manager is reading it.
>
> 2017-09-25 20:41:52.255 [main] INFO
>  org.apache.flink.configuration.GlobalConfiguration  - *Loading
> configuration property: state.backend, rocksdb*
> 2017-09-25 20:41:52.256 [main] INFO
>  org.apache.flink.configuration.GlobalConfiguration  - Loading
> configuration property: state.backend.fs.checkpointdir,
> /tmp/flink/checkpoints/
> 2017-09-25 20:41:52.256 [main] INFO
>  org.apache.flink.configuration.GlobalConfiguration  - *Loading
> configuration property: state.checkpoints.dir, /tmp/flink/checkpoints-meta/*
> 2017-09-25 20:41:52.256 [main] INFO
>  org.apache.flink.configuration.GlobalConfiguration  - Loading
> configuration property: state.savepoints.dir, /tmp/flink/savepoints/
>
> *But I still somehow get this error*
> java.lang.IllegalStateException: CheckpointConfig says to persist periodic
> checkpoints, but no checkpoint directory has been configured. You can
> configure configure one via key 'state.checkpoints.dir'.
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:209)
> at
> org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:451)
> at
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:278)
> at org.apache.flink.runtime.jobmanager.JobManager.org
> <http://org.apache.flink.runtime.jobmanager.jobmanager.org/>
> $apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1315)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:495)
> at
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
> at
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
> at
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
> at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
> at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> at
> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
> at
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
> at akka.dispatch.Mailbox.run(Mailbox.scala:220)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
> *My program only has this related to checkpointing*
>
> val env = StreamExecutionEnvironment.*getExecutionEnvironment
> *env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)env.enableCheckpointing(2 * 60 * 1000)
>
>
> Need some help to dig through this. Thanks
>
> =================== Full log =================
>
> 2017-09-25 20:41:51.466 [ForkJoinPool-1-worker-13] INFO
>  com.zendesk.consul.Consul  - Collecting kafka nodes from
> Consul(consul.docker:8500) for tags=List(dev)
> 2017-09-25 20:41:51.946 [main] INFO
>  org.apache.flink.api.java.typeutils.TypeExtractor  - class
> com.zendesk.fraud_prevention.data_types.MaxwellFilterEvents does not
> contain a setter for field events
> 2017-09-25 20:41:51.946 [main] INFO
>  org.apache.flink.api.java.typeutils.TypeExtractor  - class
> com.zendesk.fraud_prevention.data_types.MaxwellFilterEvents is not a valid
> POJO type because not all fields are valid POJO fields.
> 2017-09-25 20:41:51.985 [main] INFO
>  org.apache.flink.api.java.typeutils.TypeExtractor  - class
> com.zendesk.fraud_prevention.data_types.MaxwellFPSEvent does not contain a
> setter for field accountId
> 2017-09-25 20:41:51.985 [main] INFO
>  org.apache.flink.api.java.typeutils.TypeExtractor  - class
> com.zendesk.fraud_prevention.data_types.MaxwellFPSEvent is not a valid POJO
> type because not all fields are valid POJO fields.
> 2017-09-25 20:41:52.017 [ForkJoinPool-1-worker-13] INFO
>  com.zendesk.consul.Consul  - Collecting kafka nodes from
> Consul(consul.docker:8500) for tags=List(dev)
> 2017-09-25 20:41:52.198 [main] INFO
>  o.a.flink.streaming.api.environment.LocalStreamEnvironment  - Running job
> on local embedded Flink mini cluster
> 2017-09-25 20:41:52.253 [main] INFO
>  org.apache.flink.configuration.GlobalConfiguration  - Loading
> configuration property: jobmanager.rpc.address, localhost
> 2017-09-25 20:41:52.255 [main] INFO
>  org.apache.flink.configuration.GlobalConfiguration  - Loading
> configuration property: jobmanager.rpc.port, 6123
> 2017-09-25 20:41:52.255 [main] INFO
>  org.apache.flink.configuration.GlobalConfiguration  - Loading
> configuration property: jobmanager.web.port, 8081
> 2017-09-25 20:41:52.255 [main] INFO
>  org.apache.flink.configuration.GlobalConfiguration  - Loading
> configuration property: jobmanager.heap.mb, 1024
> 2017-09-25 20:41:52.255 [main] INFO
>  org.apache.flink.configuration.GlobalConfiguration  - Loading
> configuration property: taskmanager.heap.mb, 1024
> 2017-09-25 20:41:52.255 [main] INFO
>  org.apache.flink.configuration.GlobalConfiguration  - Loading
> configuration property: taskmanager.memory.preallocate, false
> 2017-09-25 20:41:52.255 [main] INFO
>  org.apache.flink.configuration.GlobalConfiguration  - Loading
> configuration property: taskmanager.numberOfTaskSlots, 1
> 2017-09-25 20:41:52.255 [main] INFO
>  org.apache.flink.configuration.GlobalConfiguration  - Loading
> configuration property: parallelism.default, 1
> 2017-09-25 20:41:52.255 [main] INFO
>  org.apache.flink.configuration.GlobalConfiguration  - Loading
> configuration property: fs.hdfs.hadoopconf, flink/conf
> 2017-09-25 20:41:52.255 [main] INFO
>  org.apache.flink.configuration.GlobalConfiguration  - Loading
> configuration property: state.backend, rocksdb
> 2017-09-25 20:41:52.256 [main] INFO
>  org.apache.flink.configuration.GlobalConfiguration  - Loading
> configuration property: state.backend.fs.checkpointdir,
> /tmp/flink/checkpoints/
> 2017-09-25 20:41:52.256 [main] INFO
>  org.apache.flink.configuration.GlobalConfiguration  - Loading
> configuration property: state.checkpoints.dir, /tmp/flink/checkpoints-meta/
> 2017-09-25 20:41:52.256 [main] INFO
>  org.apache.flink.configuration.GlobalConfiguration  - Loading
> configuration property: state.savepoints.dir, /tmp/flink/savepoints/
> 2017-09-25 20:41:52.256 [main] INFO
>  org.apache.flink.configuration.GlobalConfiguration  - Loading
> configuration property: taskmanager.log.path,
> /tmp/flink_logs/flink_console.log
> 2017-09-25 20:41:52.256 [main] INFO
>  org.apache.flink.configuration.GlobalConfiguration  - Loading
> configuration property: jobmanager.web.log.path,
> /tmp/flink_logs/flink_console.log
> 2017-09-25 20:41:52.256 [main] INFO
>  org.apache.flink.configuration.GlobalConfiguration  - Loading
> configuration property: high-availability, zookeeper
> 2017-09-25 20:41:52.256 [main] INFO
>  org.apache.flink.configuration.GlobalConfiguration  - Loading
> configuration property: high-availability.zookeeper.quorum,
> 172.18.0.7:2181 <http://172.18.0.7:2181/>
> 2017-09-25 20:41:52.256 [main] INFO
>  org.apache.flink.configuration.GlobalConfiguration  - Loading
> configuration property: high-availability.zookeeper.path.root, /flink
> 2017-09-25 20:41:52.256 [main] INFO
>  org.apache.flink.configuration.GlobalConfiguration  - Loading
> configuration property: high-availability.zookeeper.path.cluster-id,
> /flink_default_ns
> 2017-09-25 20:41:52.256 [main] INFO
>  org.apache.flink.configuration.GlobalConfiguration  - Loading
> configuration property: high-availability.zookeeper.storageDir,
> /tmp/flink/ha-recovery
> 2017-09-25 20:41:52.257 [main] INFO
>  org.apache.flink.runtime.minicluster.FlinkMiniCluster  - Disabled
> queryable state server
> 2017-09-25 20:41:52.271 [main] INFO
>  org.apache.flink.runtime.minicluster.FlinkMiniCluster  - Starting
> FlinkMiniCluster.
> 2017-09-25 20:41:52.442 [flink-akka.actor.default-dispatcher-4] INFO
>  akka.event.slf4j.Slf4jLogger  - Slf4jLogger started
> 2017-09-25 20:41:52.472 [main] INFO
>  org.apache.flink.runtime.blob.BlobServer  - Created BLOB server storage
> directory
> /var/folders/37/6tz6x9x97d19_ltcm_1tl_f00000gp/T/blobStore-af52ff47-49d3-4307-bb4e-016c5e5d8a39
> 2017-09-25 20:41:52.477 [main] INFO
>  org.apache.flink.runtime.blob.BlobServer  - Started BLOB server at
> 0.0.0.0:56706 <http://0.0.0.0:56706/> - max
> concurrent requests: 50 - max backlog: 1000
> 2017-09-25 20:41:52.487 [main] INFO
>  org.apache.flink.runtime.metrics.MetricRegistry  - No metrics reporter
> configured, no metrics will be exposed/reported.
> 2017-09-25 20:41:52.496 [flink-akka.actor.default-dispatcher-4] INFO
>  org.apache.flink.runtime.jobmanager.MemoryArchivist  - Started memory
> archivist akka://flink/user/archive_1
> 2017-09-25 20:41:52.501 [flink-akka.actor.default-dispatcher-3] INFO
>  org.apache.flink.runtime.jobmanager.JobManager  - Starting JobManager at
> akka://flink/user/jobmanager_1.
> 2017-09-25 20:41:52.501 [flink-akka.actor.default-dispatcher-3] INFO
>  o.a.f.r.h.nonha.embedded.EmbeddedLeaderService  - Proposing leadership to
> contender org.apache.flink.runtime.jobmanager.JobManager@5b4d1f99 @
> akka://flink/user/jobmanager_1
> 2017-09-25 20:41:52.508 [main] INFO
>  o.apache.flink.runtime.taskexecutor.TaskManagerConfiguration  - Messages
> have a max timeout of 10000 ms
> 2017-09-25 20:41:52.514 [main] INFO
>  org.apache.flink.runtime.taskexecutor.TaskManagerServices  - Temporary
> file directory '/var/folders/37/6tz6x9x97d19_ltcm_1tl_f00000gp/T': total
> 464 GB, usable 61 GB (13.15% usable)
> 2017-09-25 20:41:52.590 [flink-akka.actor.default-dispatcher-3] INFO
>  org.apache.flink.runtime.jobmanager.JobManager  - JobManager
> akka://flink/user/jobmanager_1 was granted leadership with leader session
> ID Some(d2f1c68f-2982-474f-8b7f-271d3f4e4192).
> 2017-09-25 20:41:52.592 [flink-akka.actor.default-dispatcher-2] INFO
>  o.a.f.r.h.nonha.embedded.EmbeddedLeaderService  - Received confirmation of
> leadership for leader akka://flink/user/jobmanager_1 ,
> session=d2f1c68f-2982-474f-8b7f-271d3f4e4192
> 2017-09-25 20:41:52.602 [flink-akka.actor.default-dispatcher-2] INFO
>  o.a.f.r.c.standalone.StandaloneResourceManager  - Trying to associate with
> JobManager leader akka://flink/user/jobmanager_1
> 2017-09-25 20:41:52.610 [flink-akka.actor.default-dispatcher-3] INFO
>  o.a.f.r.c.standalone.StandaloneResourceManager  - Resource Manager
> associating with leading JobManager Actor[
> akka://flink/user/jobmanager_1#-1948249729] - leader session
> d2f1c68f-2982-474f-8b7f-271d3f4e4192
> 2017-09-25 20:41:52.899 [main] INFO
>  org.apache.flink.runtime.io.network.buffer.NetworkBufferPool  - Allocated
> 363 MB for network buffer pool (number of memory segments: 11620, bytes per
> segment: 32768).
> 2017-09-25 20:41:52.915 [main] INFO
>  org.apache.flink.runtime.io.network.NetworkEnvironment  - Starting the
> network environment and its components.
> 2017-09-25 20:41:52.917 [main] INFO
>  org.apache.flink.runtime.taskexecutor.TaskManagerServices  - Limiting
> managed memory to 1145 MB, memory will be allocated lazily.
> 2017-09-25 20:41:52.922 [main] INFO
>  org.apache.flink.runtime.io.disk.iomanager.IOManager  - I/O manager uses
> directory
> /var/folders/37/6tz6x9x97d19_ltcm_1tl_f00000gp/T/flink-io-698c06ae-3d56-4765-b9b8-8a5d8b70ffd6
> for spill files.
> 2017-09-25 20:41:52.923 [main] INFO
>  org.apache.flink.runtime.metrics.MetricRegistry  - No metrics reporter
> configured, no metrics will be exposed/reported.
> 2017-09-25 20:41:52.963 [main] INFO
>  org.apache.flink.runtime.filecache.FileCache  - User file cache uses
> directory
> /var/folders/37/6tz6x9x97d19_ltcm_1tl_f00000gp/T/flink-dist-cache-a99429d5-981d-4cd0-9eeb-5d6678c650f0
> 2017-09-25 20:41:52.973 [flink-akka.actor.default-dispatcher-2] INFO
>  org.apache.flink.runtime.filecache.FileCache  - User file cache uses
> directory
> /var/folders/37/6tz6x9x97d19_ltcm_1tl_f00000gp/T/flink-dist-cache-dd359949-3837-461c-accc-41ebc67a1d8f
> 2017-09-25 20:41:52.974 [flink-akka.actor.default-dispatcher-2] INFO
>  org.apache.flink.runtime.taskmanager.TaskManager  - Starting TaskManager
> actor at akka://flink/user/taskmanager_1#1248014944.
> 2017-09-25 20:41:52.974 [flink-akka.actor.default-dispatcher-2] INFO
>  org.apache.flink.runtime.taskmanager.TaskManager  - TaskManager data
> connection information: d7308d8350e736f55357e74e04f5c106 @ localhost
> (dataPort=-1)
> 2017-09-25 20:41:52.975 [flink-akka.actor.default-dispatcher-2] INFO
>  org.apache.flink.runtime.taskmanager.TaskManager  - TaskManager has 8 task
> slot(s).
> 2017-09-25 20:41:52.976 [flink-akka.actor.default-dispatcher-2] INFO
>  org.apache.flink.runtime.taskmanager.TaskManager  - Memory usage stats:
> [HEAP: 385/684/3641 MB, NON HEAP: 35/36/-1 MB (used/committed/max)]
> 2017-09-25 20:41:52.980 [flink-akka.actor.default-dispatcher-2] INFO
>  org.apache.flink.runtime.taskmanager.TaskManager  - Trying to register at
> JobManager akka://flink/user/jobmanager_1 (attempt 1, timeout: 500
> milliseconds)
> 2017-09-25 20:41:52.982 [flink-akka.actor.default-dispatcher-2] INFO
>  o.a.f.r.c.standalone.StandaloneResourceManager  - TaskManager
> d7308d8350e736f55357e74e04f5c106 has started.
> 2017-09-25 20:41:52.984 [flink-akka.actor.default-dispatcher-3] INFO
>  org.apache.flink.runtime.instance.InstanceManager  - Registered
> TaskManager at localhost (akka://flink/user/taskmanager_1) as
> 8ad7f698f504d459202cdb8a9d6a9b34. Current number of registered hosts is 1.
> Current number of alive task slots is 8.
> 2017-09-25 20:41:52.987 [flink-akka.actor.default-dispatcher-2] INFO
>  org.apache.flink.runtime.taskmanager.TaskManager  - Successful
> registration at JobManager (akka://flink/user/jobmanager_1), starting
> network stack and library cache.
> 2017-09-25 20:41:52.989 [flink-akka.actor.default-dispatcher-2] INFO
>  org.apache.flink.runtime.taskmanager.TaskManager  - Determined BLOB server
> address to be localhost/127.0.0.1:56706
> <http://127.0.0.1:56706/>. Starting BLOB cache.
> 2017-09-25 20:41:52.990 [flink-akka.actor.default-dispatcher-2] INFO
>  org.apache.flink.runtime.blob.BlobCache  - Created BLOB cache storage
> directory
> /var/folders/37/6tz6x9x97d19_ltcm_1tl_f00000gp/T/blobStore-196b0de7-b951-4074-8865-a138283e66c1
> 2017-09-25 20:41:52.999 [flink-akka.actor.default-dispatcher-3] INFO
>  org.apache.flink.runtime.client.JobSubmissionClientActor  - Received
> SubmitJobAndWait(JobGraph(jobId: 0f0d880310bc9098027c2e4877f999fb)) but
> there is no connection to a JobManager yet.
> 2017-09-25 20:41:52.999 [flink-akka.actor.default-dispatcher-3] INFO
>  org.apache.flink.runtime.client.JobSubmissionClientActor  - Received job
> Kafka 0.10 Example development (0f0d880310bc9098027c2e4877f999fb).
> 2017-09-25 20:41:53.000 [flink-akka.actor.default-dispatcher-3] INFO
>  org.apache.flink.runtime.client.JobSubmissionClientActor  - Disconnect
> from JobManager null.
> 2017-09-25 20:41:53.003 [flink-akka.actor.default-dispatcher-2] INFO
>  org.apache.flink.runtime.client.JobSubmissionClientActor  - Connect to
> JobManager Actor[akka://flink/user/jobmanager_1#-1948249729].
> 2017-09-25 20:41:53.003 [flink-akka.actor.default-dispatcher-2] INFO
>  org.apache.flink.runtime.client.JobSubmissionClientActor  - Connected to
> JobManager at Actor[akka://flink/user/jobmanager_1#-1948249729] with
> leader session id d2f1c68f-2982-474f-8b7f-271d3f4e4192.
> 2017-09-25 20:41:53.004 [flink-akka.actor.default-dispatcher-2] INFO
>  org.apache.flink.runtime.client.JobSubmissionClientActor  - Sending
> message to JobManager akka://flink/user/jobmanager_1 to submit job Kafka
> 0.10 Example development (0f0d880310bc9098027c2e4877f999fb) and wait for
> progress
> 2017-09-25 20:41:53.005 [flink-akka.actor.default-dispatcher-3] INFO
>  org.apache.flink.runtime.client.JobSubmissionClientActor  - Upload jar
> files to job manager akka://flink/user/jobmanager_1.
> 2017-09-25 20:41:53.007 [flink-akka.actor.default-dispatcher-3] INFO
>  org.apache.flink.runtime.client.JobSubmissionClientActor  - Submit job to
> the job manager akka://flink/user/jobmanager_1.
> 2017-09-25 20:41:53.009 [flink-akka.actor.default-dispatcher-2] INFO
>  org.apache.flink.runtime.jobmanager.JobManager  - Submitting job
> 0f0d880310bc9098027c2e4877f999fb (Kafka 0.10 Example development).
> 2017-09-25 20:41:53.014 [flink-akka.actor.default-dispatcher-2] INFO
>  org.apache.flink.runtime.jobmanager.JobManager  - Using restart strategy
> FixedDelayRestartStrategy(maxNumberRestartAttempts=1,
> delayBetweenRestartAttempts=10000) for 0f0d880310bc9098027c2e4877f999fb.
> 2017-09-25 20:41:53.025 [flink-akka.actor.default-dispatcher-2] INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Job recovers via
> failover strategy: full graph restart
> 2017-09-25 20:41:53.055 [flink-akka.actor.default-dispatcher-2] INFO
>  org.apache.flink.runtime.jobmanager.JobManager  - Running initialization
> on master for job Kafka 0.10 Example development
> (0f0d880310bc9098027c2e4877f999fb).
> 2017-09-25 20:41:53.055 [flink-akka.actor.default-dispatcher-2] INFO
>  org.apache.flink.runtime.jobmanager.JobManager  - Successfully ran
> initialization on master in 0 ms.
> 2017-09-25 20:41:53.071 [flink-akka.actor.default-dispatcher-2] INFO
>  org.apache.flink.runtime.jobmanager.JobManager  - No state backend has
> been configured, using default state backend (Memory / JobManager)
> 2017-09-25 20:41:53.078 [flink-akka.actor.default-dispatcher-2] ERROR
> org.apache.flink.runtime.jobmanager.JobManager  - Failed to submit job
> 0f0d880310bc9098027c2e4877f999fb (Kafka 0.10 Example development)
> java.lang.IllegalStateException: CheckpointConfig says to persist periodic
> checkpoints, but no checkpoint directory has been configured. You can
> configure configure one via key 'state.checkpoints.dir'.
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:209)
> at
> org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:451)
> at
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:278)
> at org.apache.flink.runtime.jobmanager.JobManager.org
> <http://org.apache.flink.runtime.jobmanager.jobmanager.org/>
> $apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1315)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:495)
> at
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
> at
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
> at
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
> at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
> at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> at
> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
> at
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
> at akka.dispatch.Mailbox.run(Mailbox.scala:220)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 2017-09-25 20:41:53.085 [flink-akka.actor.default-dispatcher-3] INFO
>  org.apache.flink.runtime.client.JobSubmissionClientActor  - Terminate
> JobClientActor.
> 2017-09-25 20:41:53.085 [flink-akka.actor.default-dispatcher-3] INFO
>  org.apache.flink.runtime.client.JobSubmissionClientActor  - Disconnect
> from JobManager Actor[akka://flink/user/jobmanager_1#-1948249729].
> 2017-09-25 20:41:53.086 [main] INFO
>  org.apache.flink.runtime.client.JobClient  - Job execution failed
> 2017-09-25 20:41:53.086 [main] INFO
>  org.apache.flink.runtime.minicluster.FlinkMiniCluster  - Stopping
> FlinkMiniCluster.
> 2017-09-25 20:41:53.090 [flink-akka.actor.default-dispatcher-2] INFO
>  org.apache.flink.runtime.taskmanager.TaskManager  - Stopping TaskManager
> akka://flink/user/taskmanager_1#1248014944.
> 2017-09-25 20:41:53.090 [flink-akka.actor.default-dispatcher-4] INFO
>  org.apache.flink.runtime.jobmanager.JobManager  - Stopping JobManager
> akka://flink/user/jobmanager_1.
> 2017-09-25 20:41:53.091 [flink-akka.actor.default-dispatcher-2] INFO
>  org.apache.flink.runtime.taskmanager.TaskManager  - Disassociating from
> JobManager
> 2017-09-25 20:41:53.093 [flink-akka.actor.default-dispatcher-2] INFO
>  org.apache.flink.runtime.blob.BlobCache  - Shutting down BlobCache
> 2017-09-25 20:41:53.103 [flink-akka.actor.default-dispatcher-4] INFO
>  org.apache.flink.runtime.blob.BlobServer  - Stopped BLOB server at
> 0.0.0.0:56706 <http://0.0.0.0:56706/>
> 2017-09-25 20:41:53.104 [flink-akka.actor.default-dispatcher-2] INFO
>  org.apache.flink.runtime.io.disk.iomanager.IOManager  - I/O manager
> removed spill file directory
> /var/folders/37/6tz6x9x97d19_ltcm_1tl_f00000gp/T/flink-io-698c06ae-3d56-4765-b9b8-8a5d8b70ffd6
> 2017-09-25 20:41:53.104 [flink-akka.actor.default-dispatcher-2] INFO
>  org.apache.flink.runtime.io.network.NetworkEnvironment  - Shutting down
> the network environment and its components.
> 2017-09-25 20:41:53.122 [flink-akka.actor.default-dispatcher-2] INFO
>  org.apache.flink.runtime.taskmanager.TaskManager  - Task manager
> akka://flink/user/taskmanager_1 is completely shut down.
>
>
>

Re: CheckpointConfig says to persist periodic checkpoints, but no checkpoint directory has been configured.

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,

I think the GlobalConfiguration is not necessarily read by the (local) JobManager. You could try using StreamExecutionEnvironment.createLocalEnvironment(int, Configuration) to manually specify a configuration.

Best,
Aljoscha

> On 26. Sep 2017, at 05:49, Hao Sun <ha...@zendesk.com> wrote:
> 
> Hi I am running flink in dev mode through Intellij, I have flink-conf.yaml correctly configured and from the log you can see job manager is reading it.
> 
> 2017-09-25 20:41:52.255 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: state.backend, rocksdb
> 2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: state.backend.fs.checkpointdir, /tmp/flink/checkpoints/
> 2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: state.checkpoints.dir, /tmp/flink/checkpoints-meta/
> 2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: state.savepoints.dir, /tmp/flink/savepoints/
> 
> But I still somehow get this error
> java.lang.IllegalStateException: CheckpointConfig says to persist periodic checkpoints, but no checkpoint directory has been configured. You can configure configure one via key 'state.checkpoints.dir'.
> 	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:209)
> 	at org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:451)
> 	at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:278)
> 	at org.apache.flink.runtime.jobmanager.JobManager.org <http://org.apache.flink.runtime.jobmanager.jobmanager.org/>$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1315)
> 	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:495)
> 	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
> 	at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
> 	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
> 	at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
> 	at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
> 	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> 	at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
> 	at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
> 	at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125)
> 	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> 	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> 	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
> 	at akka.dispatch.Mailbox.run(Mailbox.scala:220)
> 	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
> 	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> 	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> 	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> 	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 
> 
> My program only has this related to checkpointing
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
> env.enableCheckpointing(2 * 60 * 1000)
> 
> Need some help to dig through this. Thanks
> 
> =================== Full log =================
> 
> 2017-09-25 20:41:51.466 [ForkJoinPool-1-worker-13] INFO  com.zendesk.consul.Consul  - Collecting kafka nodes from Consul(consul.docker:8500) for tags=List(dev)
> 2017-09-25 20:41:51.946 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - class com.zendesk.fraud_prevention.data_types.MaxwellFilterEvents does not contain a setter for field events
> 2017-09-25 20:41:51.946 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - class com.zendesk.fraud_prevention.data_types.MaxwellFilterEvents is not a valid POJO type because not all fields are valid POJO fields.
> 2017-09-25 20:41:51.985 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - class com.zendesk.fraud_prevention.data_types.MaxwellFPSEvent does not contain a setter for field accountId
> 2017-09-25 20:41:51.985 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - class com.zendesk.fraud_prevention.data_types.MaxwellFPSEvent is not a valid POJO type because not all fields are valid POJO fields.
> 2017-09-25 20:41:52.017 [ForkJoinPool-1-worker-13] INFO  com.zendesk.consul.Consul  - Collecting kafka nodes from Consul(consul.docker:8500) for tags=List(dev)
> 2017-09-25 20:41:52.198 [main] INFO  o.a.flink.streaming.api.environment.LocalStreamEnvironment  - Running job on local embedded Flink mini cluster
> 2017-09-25 20:41:52.253 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: jobmanager.rpc.address, localhost
> 2017-09-25 20:41:52.255 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: jobmanager.rpc.port, 6123
> 2017-09-25 20:41:52.255 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: jobmanager.web.port, 8081
> 2017-09-25 20:41:52.255 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: jobmanager.heap.mb, 1024
> 2017-09-25 20:41:52.255 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: taskmanager.heap.mb, 1024
> 2017-09-25 20:41:52.255 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: taskmanager.memory.preallocate, false
> 2017-09-25 20:41:52.255 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: taskmanager.numberOfTaskSlots, 1
> 2017-09-25 20:41:52.255 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: parallelism.default, 1
> 2017-09-25 20:41:52.255 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: fs.hdfs.hadoopconf, flink/conf
> 2017-09-25 20:41:52.255 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: state.backend, rocksdb
> 2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: state.backend.fs.checkpointdir, /tmp/flink/checkpoints/
> 2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: state.checkpoints.dir, /tmp/flink/checkpoints-meta/
> 2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: state.savepoints.dir, /tmp/flink/savepoints/
> 2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: taskmanager.log.path, /tmp/flink_logs/flink_console.log
> 2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: jobmanager.web.log.path, /tmp/flink_logs/flink_console.log
> 2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: high-availability, zookeeper
> 2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: high-availability.zookeeper.quorum, 172.18.0.7:2181 <http://172.18.0.7:2181/>
> 2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: high-availability.zookeeper.path.root, /flink
> 2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: high-availability.zookeeper.path.cluster-id, /flink_default_ns
> 2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: high-availability.zookeeper.storageDir, /tmp/flink/ha-recovery
> 2017-09-25 20:41:52.257 [main] INFO  org.apache.flink.runtime.minicluster.FlinkMiniCluster  - Disabled queryable state server
> 2017-09-25 20:41:52.271 [main] INFO  org.apache.flink.runtime.minicluster.FlinkMiniCluster  - Starting FlinkMiniCluster.
> 2017-09-25 20:41:52.442 [flink-akka.actor.default-dispatcher-4] INFO  akka.event.slf4j.Slf4jLogger  - Slf4jLogger started
> 2017-09-25 20:41:52.472 [main] INFO  org.apache.flink.runtime.blob.BlobServer  - Created BLOB server storage directory /var/folders/37/6tz6x9x97d19_ltcm_1tl_f00000gp/T/blobStore-af52ff47-49d3-4307-bb4e-016c5e5d8a39
> 2017-09-25 20:41:52.477 [main] INFO  org.apache.flink.runtime.blob.BlobServer  - Started BLOB server at 0.0.0.0:56706 <http://0.0.0.0:56706/> - max concurrent requests: 50 - max backlog: 1000
> 2017-09-25 20:41:52.487 [main] INFO  org.apache.flink.runtime.metrics.MetricRegistry  - No metrics reporter configured, no metrics will be exposed/reported.
> 2017-09-25 20:41:52.496 [flink-akka.actor.default-dispatcher-4] INFO  org.apache.flink.runtime.jobmanager.MemoryArchivist  - Started memory archivist akka://flink/user/archive_1
> 2017-09-25 20:41:52.501 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.jobmanager.JobManager  - Starting JobManager at akka://flink/user/jobmanager_1.
> 2017-09-25 20:41:52.501 [flink-akka.actor.default-dispatcher-3] INFO  o.a.f.r.h.nonha.embedded.EmbeddedLeaderService  - Proposing leadership to contender org.apache.flink.runtime.jobmanager.JobManager@5b4d1f99 @ akka://flink/user/jobmanager_1
> 2017-09-25 20:41:52.508 [main] INFO  o.apache.flink.runtime.taskexecutor.TaskManagerConfiguration  - Messages have a max timeout of 10000 ms
> 2017-09-25 20:41:52.514 [main] INFO  org.apache.flink.runtime.taskexecutor.TaskManagerServices  - Temporary file directory '/var/folders/37/6tz6x9x97d19_ltcm_1tl_f00000gp/T': total 464 GB, usable 61 GB (13.15% usable)
> 2017-09-25 20:41:52.590 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.jobmanager.JobManager  - JobManager akka://flink/user/jobmanager_1 was granted leadership with leader session ID Some(d2f1c68f-2982-474f-8b7f-271d3f4e4192).
> 2017-09-25 20:41:52.592 [flink-akka.actor.default-dispatcher-2] INFO  o.a.f.r.h.nonha.embedded.EmbeddedLeaderService  - Received confirmation of leadership for leader akka://flink/user/jobmanager_1 , session=d2f1c68f-2982-474f-8b7f-271d3f4e4192
> 2017-09-25 20:41:52.602 [flink-akka.actor.default-dispatcher-2] INFO  o.a.f.r.c.standalone.StandaloneResourceManager  - Trying to associate with JobManager leader akka://flink/user/jobmanager_1
> 2017-09-25 20:41:52.610 [flink-akka.actor.default-dispatcher-3] INFO  o.a.f.r.c.standalone.StandaloneResourceManager  - Resource Manager associating with leading JobManager Actor[akka://flink/user/jobmanager_1#-1948249729] - leader session d2f1c68f-2982-474f-8b7f-271d3f4e4192
> 2017-09-25 20:41:52.899 [main] INFO  org.apache.flink.runtime.io.network.buffer.NetworkBufferPool  - Allocated 363 MB for network buffer pool (number of memory segments: 11620, bytes per segment: 32768).
> 2017-09-25 20:41:52.915 [main] INFO  org.apache.flink.runtime.io.network.NetworkEnvironment  - Starting the network environment and its components.
> 2017-09-25 20:41:52.917 [main] INFO  org.apache.flink.runtime.taskexecutor.TaskManagerServices  - Limiting managed memory to 1145 MB, memory will be allocated lazily.
> 2017-09-25 20:41:52.922 [main] INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager  - I/O manager uses directory /var/folders/37/6tz6x9x97d19_ltcm_1tl_f00000gp/T/flink-io-698c06ae-3d56-4765-b9b8-8a5d8b70ffd6 for spill files.
> 2017-09-25 20:41:52.923 [main] INFO  org.apache.flink.runtime.metrics.MetricRegistry  - No metrics reporter configured, no metrics will be exposed/reported.
> 2017-09-25 20:41:52.963 [main] INFO  org.apache.flink.runtime.filecache.FileCache  - User file cache uses directory /var/folders/37/6tz6x9x97d19_ltcm_1tl_f00000gp/T/flink-dist-cache-a99429d5-981d-4cd0-9eeb-5d6678c650f0
> 2017-09-25 20:41:52.973 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.filecache.FileCache  - User file cache uses directory /var/folders/37/6tz6x9x97d19_ltcm_1tl_f00000gp/T/flink-dist-cache-dd359949-3837-461c-accc-41ebc67a1d8f
> 2017-09-25 20:41:52.974 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskmanager.TaskManager  - Starting TaskManager actor at akka://flink/user/taskmanager_1#1248014944.
> 2017-09-25 20:41:52.974 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskmanager.TaskManager  - TaskManager data connection information: d7308d8350e736f55357e74e04f5c106 @ localhost (dataPort=-1)
> 2017-09-25 20:41:52.975 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskmanager.TaskManager  - TaskManager has 8 task slot(s).
> 2017-09-25 20:41:52.976 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskmanager.TaskManager  - Memory usage stats: [HEAP: 385/684/3641 MB, NON HEAP: 35/36/-1 MB (used/committed/max)]
> 2017-09-25 20:41:52.980 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskmanager.TaskManager  - Trying to register at JobManager akka://flink/user/jobmanager_1 (attempt 1, timeout: 500 milliseconds)
> 2017-09-25 20:41:52.982 [flink-akka.actor.default-dispatcher-2] INFO  o.a.f.r.c.standalone.StandaloneResourceManager  - TaskManager d7308d8350e736f55357e74e04f5c106 has started.
> 2017-09-25 20:41:52.984 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.instance.InstanceManager  - Registered TaskManager at localhost (akka://flink/user/taskmanager_1) as 8ad7f698f504d459202cdb8a9d6a9b34. Current number of registered hosts is 1. Current number of alive task slots is 8.
> 2017-09-25 20:41:52.987 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskmanager.TaskManager  - Successful registration at JobManager (akka://flink/user/jobmanager_1), starting network stack and library cache.
> 2017-09-25 20:41:52.989 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskmanager.TaskManager  - Determined BLOB server address to be localhost/127.0.0.1:56706 <http://127.0.0.1:56706/>. Starting BLOB cache.
> 2017-09-25 20:41:52.990 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.blob.BlobCache  - Created BLOB cache storage directory /var/folders/37/6tz6x9x97d19_ltcm_1tl_f00000gp/T/blobStore-196b0de7-b951-4074-8865-a138283e66c1
> 2017-09-25 20:41:52.999 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.client.JobSubmissionClientActor  - Received SubmitJobAndWait(JobGraph(jobId: 0f0d880310bc9098027c2e4877f999fb)) but there is no connection to a JobManager yet.
> 2017-09-25 20:41:52.999 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.client.JobSubmissionClientActor  - Received job Kafka 0.10 Example development (0f0d880310bc9098027c2e4877f999fb).
> 2017-09-25 20:41:53.000 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.client.JobSubmissionClientActor  - Disconnect from JobManager null.
> 2017-09-25 20:41:53.003 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.client.JobSubmissionClientActor  - Connect to JobManager Actor[akka://flink/user/jobmanager_1#-1948249729].
> 2017-09-25 20:41:53.003 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.client.JobSubmissionClientActor  - Connected to JobManager at Actor[akka://flink/user/jobmanager_1#-1948249729] with leader session id d2f1c68f-2982-474f-8b7f-271d3f4e4192.
> 2017-09-25 20:41:53.004 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.client.JobSubmissionClientActor  - Sending message to JobManager akka://flink/user/jobmanager_1 to submit job Kafka 0.10 Example development (0f0d880310bc9098027c2e4877f999fb) and wait for progress
> 2017-09-25 20:41:53.005 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.client.JobSubmissionClientActor  - Upload jar files to job manager akka://flink/user/jobmanager_1.
> 2017-09-25 20:41:53.007 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.client.JobSubmissionClientActor  - Submit job to the job manager akka://flink/user/jobmanager_1.
> 2017-09-25 20:41:53.009 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.jobmanager.JobManager  - Submitting job 0f0d880310bc9098027c2e4877f999fb (Kafka 0.10 Example development).
> 2017-09-25 20:41:53.014 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.jobmanager.JobManager  - Using restart strategy FixedDelayRestartStrategy(maxNumberRestartAttempts=1, delayBetweenRestartAttempts=10000) for 0f0d880310bc9098027c2e4877f999fb.
> 2017-09-25 20:41:53.025 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Job recovers via failover strategy: full graph restart
> 2017-09-25 20:41:53.055 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.jobmanager.JobManager  - Running initialization on master for job Kafka 0.10 Example development (0f0d880310bc9098027c2e4877f999fb).
> 2017-09-25 20:41:53.055 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.jobmanager.JobManager  - Successfully ran initialization on master in 0 ms.
> 2017-09-25 20:41:53.071 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.jobmanager.JobManager  - No state backend has been configured, using default state backend (Memory / JobManager)
> 2017-09-25 20:41:53.078 [flink-akka.actor.default-dispatcher-2] ERROR org.apache.flink.runtime.jobmanager.JobManager  - Failed to submit job 0f0d880310bc9098027c2e4877f999fb (Kafka 0.10 Example development)
> java.lang.IllegalStateException: CheckpointConfig says to persist periodic checkpoints, but no checkpoint directory has been configured. You can configure configure one via key 'state.checkpoints.dir'.
> 	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:209)
> 	at org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:451)
> 	at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:278)
> 	at org.apache.flink.runtime.jobmanager.JobManager.org <http://org.apache.flink.runtime.jobmanager.jobmanager.org/>$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1315)
> 	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:495)
> 	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
> 	at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
> 	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
> 	at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
> 	at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
> 	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> 	at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
> 	at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
> 	at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125)
> 	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> 	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> 	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
> 	at akka.dispatch.Mailbox.run(Mailbox.scala:220)
> 	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
> 	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> 	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> 	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> 	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 2017-09-25 20:41:53.085 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.client.JobSubmissionClientActor  - Terminate JobClientActor.
> 2017-09-25 20:41:53.085 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.client.JobSubmissionClientActor  - Disconnect from JobManager Actor[akka://flink/user/jobmanager_1#-1948249729].
> 2017-09-25 20:41:53.086 [main] INFO  org.apache.flink.runtime.client.JobClient  - Job execution failed
> 2017-09-25 20:41:53.086 [main] INFO  org.apache.flink.runtime.minicluster.FlinkMiniCluster  - Stopping FlinkMiniCluster.
> 2017-09-25 20:41:53.090 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskmanager.TaskManager  - Stopping TaskManager akka://flink/user/taskmanager_1#1248014944.
> 2017-09-25 20:41:53.090 [flink-akka.actor.default-dispatcher-4] INFO  org.apache.flink.runtime.jobmanager.JobManager  - Stopping JobManager akka://flink/user/jobmanager_1.
> 2017-09-25 20:41:53.091 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskmanager.TaskManager  - Disassociating from JobManager
> 2017-09-25 20:41:53.093 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.blob.BlobCache  - Shutting down BlobCache
> 2017-09-25 20:41:53.103 [flink-akka.actor.default-dispatcher-4] INFO  org.apache.flink.runtime.blob.BlobServer  - Stopped BLOB server at 0.0.0.0:56706 <http://0.0.0.0:56706/>
> 2017-09-25 20:41:53.104 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager  - I/O manager removed spill file directory /var/folders/37/6tz6x9x97d19_ltcm_1tl_f00000gp/T/flink-io-698c06ae-3d56-4765-b9b8-8a5d8b70ffd6
> 2017-09-25 20:41:53.104 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.io.network.NetworkEnvironment  - Shutting down the network environment and its components.
> 2017-09-25 20:41:53.122 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskmanager.TaskManager  - Task manager akka://flink/user/taskmanager_1 is completely shut down.