You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by vino yang <ya...@gmail.com> on 2019/12/18 10:46:33 UTC

Re: [Question] How to use different filesystem between checkpoint data and user data sink

Hi ouywl,

*>>    Thread.currentThread().getContextClassLoader();*

What does this statement mean in your program?

In addition, can you share your implementation of the customized file
system plugin and the related exception?

Best,
Vino

ouywl <ou...@139.com> 于2019年12月18日周三 下午4:59写道:

> Hi all,
>     We have implemented a filesystem plugin for sink data to hdfs1, and
> the yarn for flink running is used hdfs2. So when the job running, the
> jobmanager use the conf of hdfs1 to create filesystem, the filesystem
> plugin  is conflict with flink component.
>     We implemeted step:
>       1.  ‘FileSystemEnhance’ is implement from “FileSystem”
>       2.  ‘FileSystemFactoryEnhance’ is implement from “FileSystemFactory”,add
> kerberos auth in ”FileSystemFactoryEnhance"
>       3. Add a service entry. Create a file
> META-INF/services/org.apache.flink.core.fs.FileSystemFactory which
> contains the class name of “ FileSystemFactoryEnhance.class”
>
> And  the job mainclass is :
>    “ *public static void main(String[] args) throws Exception{*
>
> *    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();*
>
>
>
>
>
>
>
>
> *    env.enableCheckpointing(60*1000);    env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);    env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);    env.getConfig().enableSysoutLogging();    Properties props = new Properties();    props.put("bootstrap.servers", SERVERS);    props.put("group.id <http://group.id>", GROUPID);    props.put("enable.auto.commit", "true");    // props.put("auto.commit.interval.ms <http://auto.commit.interval.ms>", "1000");    props.put("session.timeout.ms <http://session.timeout.ms>", "30000");    props.put("auto.offset.reset", "latest");    props.put("key.deserializer", org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StringDeserializer.class.getName());    props.put("value.deserializer", StringDeserializer.class.getName());    FlinkKafkaConsumer010 consumer011 = new FlinkKafkaConsumer010<String>("zyf_test_2", new SimpleStringSchema(), props);    DataStream<String> source = env.addSource(consumer011).setParallelism(1);    source.print();    Thread.currentThread().getContextClassLoader();    StreamingFileSink sink = StreamingFileSink            .forRowFormat(new Path("hdfs://bdms-test/user/sloth/zyf"), new SimpleStringEncoder<>("UTF-8"))            .build();    source.addSink(sink);    env.execute();}”And start the job, the jobmanager filesystem is error, the log means the jobmananger use “FileSystemFactoryEnhance” filesystem and confict.As the url https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/#pluggable-file-systems <https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/#pluggable-file-systems> how to avoid use “Thread.currentThread().getContextClassLoader()"*
>
>
> ouywl
> ouywl@139.com
>
> <https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1&name=ouywl&uid=ouywl%40139.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fsma8dc7719018ba2517da7111b3db5a170.jpg&items=%5B%22ouywl%40139.com%22%5D>
>
>

Re: [Question] How to use different filesystem between checkpointdata and user data sink

Posted by Piotr Nowojski <pi...@ververica.com>.
Hi,

Can you share the full stack trace or just attach job manager and task managers logs? This exception should have had some cause logged below.

Piotrek

> On 19 Dec 2019, at 04:06, ouywl <ou...@139.com> wrote:
> 
> Hi Piotr Nowojski,
>    I have move my filesystem plugin to FLINK_HOME/pulgins in flink 1.9.1. The jobmanage don’t start up ,and It load the filesystem plugin in my owner plugin jar . and the log is :
>   “2019-12-19 10:58:32,394 WARN org.apache.flink.configuration.Configuration - Config uses deprecated configuration key 'high-availability.zookeeper.storageDir' instead of proper key 'high-availability.storageDir'
> 2019-12-19 10:58:32,398 INFO  com.filesystem.plugin.FileSystemFactoryEnhance                -  trying to get hadoopEnv, hadoopPath = /conf/hadoop_conf
> 2019-12-19 10:58:32,434 WARN  org.apache.hadoop.conf.Configuration                          - /tmp/mammut-core-site.xml:an attempt to override final parameter: fs.defaultFS;  Ignoring.
> 2019-12-19 10:58:32,436 WARN  org.apache.hadoop.conf.Configuration                          - /tmp/mammut-hdfs-site.xml:an attempt to override final parameter: dfs.datanode.data.dir;  Ignoring.
> 2019-12-19 10:58:32,436 WARN  org.apache.hadoop.conf.Configuration                          - /tmp/mammut-hdfs-site.xml:an attempt to override final parameter: dfs.datanode.failed.volumes.tolerated;  Ignoring.
> 2019-12-19 10:58:32,436 WARN  org.apache.hadoop.conf.Configuration                          - /tmp/mammut-hdfs-site.xml:an attempt to override final parameter: dfs.namenode.name.dir;  Ignoring.
> 2019-12-19 10:58:32,878 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Shutting YarnJobClusterEntrypoint down with application status FAILED. Diagnostics java.io.IOException: Could not create FileSystem for highly available storage (high-availability.storageDir)
> 	at org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:119)
> 	at org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:92)
> 	at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:120)
> 	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:292)
> 	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:257)
> 	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:202)
> 	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:164)
> 	at java.security.AccessController.doPrivileged(Native Method)
> 	at javax.security.auth.Subject.doAs(Subject.java:422)
> 	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
> 	at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> 	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:163)
> 	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:501) 
>  at org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:102)"
> 
> 	
> ouywl
> ouywl@139.com
>  <https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1&name=ouywl&uid=ouywl%40139.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fsma8dc7719018ba2517da7111b3db5a170.jpg&items=%5B%22ouywl%40139.com%22%5D>
> On 12/19/2019 00:01,Piotr Nowojski<pi...@ververica.com> <ma...@ververica.com> wrote: 
> Hi,
> 
> As Yang Wang pointed out, you should use the new plugins mechanism.
> 
> If it doesn’t work, first make sure that you are shipping/distributing the plugins jars correctly - the correct plugins directory structure both on the client machine. Next make sure that the cluster has the same correct setup. This is especially true for the standalone/cluster execution modes. For yarn, mesos, docker the plugins dir should be shipped to the cluster by Flink itself, however Plugins support in yarn is currently semi broken [1]. This is already fixed, but waiting to be released in 1.9.2 and 1.10.
> 
> If it still doesn’t work, look for TaskManager logs what plugins/file systems are being loaded during the startup. If none, that's the problem.
> 
> Piotrek
> 
> [1] https://issues.apache.org/jira/browse/FLINK-14382 <https://issues.apache.org/jira/browse/FLINK-14382>
> 
>> On 18 Dec 2019, at 12:40, Yang Wang <danrtsey.wy@gmail.com <ma...@gmail.com>> wrote:
>> 
>> You could have a try the new plugin mechanism.
>> Create a new directory named "myhdfs" under $FLINK_HOME/plugins, and then put your filesystem related jars in it.
>> Different plugins will be loaded by separate classloader to avoid conflict.
>> 
>> 
>> Best,
>> Yang
>> 
>> vino yang <yanghua1127@gmail.com <ma...@gmail.com>> 于2019年12月18日周三 下午6:46写道:
>> Hi ouywl,
>> 
>> >>    Thread.currentThread().getContextClassLoader();
>> What does this statement mean in your program?
>> 
>> In addition, can you share your implementation of the customized file system plugin and the related exception?
>> 
>> Best,
>> Vino
>> 
>> ouywl <ouywl@139.com <ma...@139.com>> 于2019年12月18日周三 下午4:59写道:
>> Hi all,
>>     We have implemented a filesystem plugin for sink data to hdfs1, and the yarn for flink running is used hdfs2. So when the job running, the jobmanager use the conf of hdfs1 to create filesystem, the filesystem plugin  is conflict with flink component. 
>>     We implemeted step:
>>       1.  ‘FileSystemEnhance’ is implement from “FileSystem”
>>       2.  ‘FileSystemFactoryEnhance’ is implement from “FileSystemFactory”,add kerberos auth in ”FileSystemFactoryEnhance" 
>>       3. Add a service entry. Create a file META-INF/services/org.apache.flink.core.fs.FileSystemFactory which contains the class name of “ FileSystemFactoryEnhance.class”
>> 
>> And  the job mainclass is :
>>    “ public static void main(String[] args) throws Exception{
>>     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>     env.enableCheckpointing(60*1000);
>>     env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>>     env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>>     env.getConfig().enableSysoutLogging();
>> 
>> 
>>     Properties props = new Properties();
>>     props.put("bootstrap.servers", SERVERS);
>>     props.put("group.id <http://group.id/>", GROUPID);
>>     props.put("enable.auto.commit", "true");
>>     // props.put("auto.commit.interval.ms <http://auto.commit.interval.ms/>", "1000");
>>     props.put("session.timeout.ms <http://session.timeout.ms/>", "30000");
>>     props.put("auto.offset.reset", "latest");
>>     props.put("key.deserializer", org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StringDeserializer.class.getName());
>>     props.put("value.deserializer", StringDeserializer.class.getName());
>>     FlinkKafkaConsumer010 consumer011 = new FlinkKafkaConsumer010<String>("zyf_test_2", new SimpleStringSchema(), props);
>>     DataStream<String> source = env.addSource(consumer011).setParallelism(1);
>> 
>>     source.print();
>>     Thread.currentThread().getContextClassLoader();
>> 
>>     StreamingFileSink sink = StreamingFileSink
>>             .forRowFormat(new Path("hdfs://bdms-test/user/sloth/zyf <hdfs://bdms-test/user/sloth/zyf>"), new SimpleStringEncoder<>("UTF-8"))
>>             .build();
>> 
>>     source.addSink(sink);
>> 
>>     env.execute();
>> }”
>> 
>> And start the job, the jobmanager filesystem is error, the log means the jobmananger use “FileSystemFactoryEnhance” filesystem and confict.
>> 
>> As the url https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/#pluggable-file-systems <https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/#pluggable-file-systems> how to avoid use “Thread.currentThread().getContextClassLoader()"
>> 
>> 	
>> ouywl
>> ouywl@139.com
>>  <https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1&name=ouywl&uid=ouywl%40139.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fsma8dc7719018ba2517da7111b3db5a170.jpg&items=%5B%22ouywl%40139.com%22%5D>
> 
> 


Re: [Question] How to use different filesystem between checkpointdata and user data sink

Posted by Piotr Nowojski <pi...@ververica.com>.
Hi,

Can you share the full stack trace or just attach job manager and task managers logs? This exception should have had some cause logged below.

Piotrek

> On 19 Dec 2019, at 04:06, ouywl <ou...@139.com> wrote:
> 
> Hi Piotr Nowojski,
>    I have move my filesystem plugin to FLINK_HOME/pulgins in flink 1.9.1. The jobmanage don’t start up ,and It load the filesystem plugin in my owner plugin jar . and the log is :
>   “2019-12-19 10:58:32,394 WARN org.apache.flink.configuration.Configuration - Config uses deprecated configuration key 'high-availability.zookeeper.storageDir' instead of proper key 'high-availability.storageDir'
> 2019-12-19 10:58:32,398 INFO  com.filesystem.plugin.FileSystemFactoryEnhance                -  trying to get hadoopEnv, hadoopPath = /conf/hadoop_conf
> 2019-12-19 10:58:32,434 WARN  org.apache.hadoop.conf.Configuration                          - /tmp/mammut-core-site.xml:an attempt to override final parameter: fs.defaultFS;  Ignoring.
> 2019-12-19 10:58:32,436 WARN  org.apache.hadoop.conf.Configuration                          - /tmp/mammut-hdfs-site.xml:an attempt to override final parameter: dfs.datanode.data.dir;  Ignoring.
> 2019-12-19 10:58:32,436 WARN  org.apache.hadoop.conf.Configuration                          - /tmp/mammut-hdfs-site.xml:an attempt to override final parameter: dfs.datanode.failed.volumes.tolerated;  Ignoring.
> 2019-12-19 10:58:32,436 WARN  org.apache.hadoop.conf.Configuration                          - /tmp/mammut-hdfs-site.xml:an attempt to override final parameter: dfs.namenode.name.dir;  Ignoring.
> 2019-12-19 10:58:32,878 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Shutting YarnJobClusterEntrypoint down with application status FAILED. Diagnostics java.io.IOException: Could not create FileSystem for highly available storage (high-availability.storageDir)
> 	at org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:119)
> 	at org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:92)
> 	at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:120)
> 	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:292)
> 	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:257)
> 	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:202)
> 	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:164)
> 	at java.security.AccessController.doPrivileged(Native Method)
> 	at javax.security.auth.Subject.doAs(Subject.java:422)
> 	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
> 	at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> 	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:163)
> 	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:501) 
>  at org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:102)"
> 
> 	
> ouywl
> ouywl@139.com
>  <https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1&name=ouywl&uid=ouywl%40139.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fsma8dc7719018ba2517da7111b3db5a170.jpg&items=%5B%22ouywl%40139.com%22%5D>
> On 12/19/2019 00:01,Piotr Nowojski<pi...@ververica.com> <ma...@ververica.com> wrote: 
> Hi,
> 
> As Yang Wang pointed out, you should use the new plugins mechanism.
> 
> If it doesn’t work, first make sure that you are shipping/distributing the plugins jars correctly - the correct plugins directory structure both on the client machine. Next make sure that the cluster has the same correct setup. This is especially true for the standalone/cluster execution modes. For yarn, mesos, docker the plugins dir should be shipped to the cluster by Flink itself, however Plugins support in yarn is currently semi broken [1]. This is already fixed, but waiting to be released in 1.9.2 and 1.10.
> 
> If it still doesn’t work, look for TaskManager logs what plugins/file systems are being loaded during the startup. If none, that's the problem.
> 
> Piotrek
> 
> [1] https://issues.apache.org/jira/browse/FLINK-14382 <https://issues.apache.org/jira/browse/FLINK-14382>
> 
>> On 18 Dec 2019, at 12:40, Yang Wang <danrtsey.wy@gmail.com <ma...@gmail.com>> wrote:
>> 
>> You could have a try the new plugin mechanism.
>> Create a new directory named "myhdfs" under $FLINK_HOME/plugins, and then put your filesystem related jars in it.
>> Different plugins will be loaded by separate classloader to avoid conflict.
>> 
>> 
>> Best,
>> Yang
>> 
>> vino yang <yanghua1127@gmail.com <ma...@gmail.com>> 于2019年12月18日周三 下午6:46写道:
>> Hi ouywl,
>> 
>> >>    Thread.currentThread().getContextClassLoader();
>> What does this statement mean in your program?
>> 
>> In addition, can you share your implementation of the customized file system plugin and the related exception?
>> 
>> Best,
>> Vino
>> 
>> ouywl <ouywl@139.com <ma...@139.com>> 于2019年12月18日周三 下午4:59写道:
>> Hi all,
>>     We have implemented a filesystem plugin for sink data to hdfs1, and the yarn for flink running is used hdfs2. So when the job running, the jobmanager use the conf of hdfs1 to create filesystem, the filesystem plugin  is conflict with flink component. 
>>     We implemeted step:
>>       1.  ‘FileSystemEnhance’ is implement from “FileSystem”
>>       2.  ‘FileSystemFactoryEnhance’ is implement from “FileSystemFactory”,add kerberos auth in ”FileSystemFactoryEnhance" 
>>       3. Add a service entry. Create a file META-INF/services/org.apache.flink.core.fs.FileSystemFactory which contains the class name of “ FileSystemFactoryEnhance.class”
>> 
>> And  the job mainclass is :
>>    “ public static void main(String[] args) throws Exception{
>>     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>     env.enableCheckpointing(60*1000);
>>     env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>>     env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>>     env.getConfig().enableSysoutLogging();
>> 
>> 
>>     Properties props = new Properties();
>>     props.put("bootstrap.servers", SERVERS);
>>     props.put("group.id <http://group.id/>", GROUPID);
>>     props.put("enable.auto.commit", "true");
>>     // props.put("auto.commit.interval.ms <http://auto.commit.interval.ms/>", "1000");
>>     props.put("session.timeout.ms <http://session.timeout.ms/>", "30000");
>>     props.put("auto.offset.reset", "latest");
>>     props.put("key.deserializer", org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StringDeserializer.class.getName());
>>     props.put("value.deserializer", StringDeserializer.class.getName());
>>     FlinkKafkaConsumer010 consumer011 = new FlinkKafkaConsumer010<String>("zyf_test_2", new SimpleStringSchema(), props);
>>     DataStream<String> source = env.addSource(consumer011).setParallelism(1);
>> 
>>     source.print();
>>     Thread.currentThread().getContextClassLoader();
>> 
>>     StreamingFileSink sink = StreamingFileSink
>>             .forRowFormat(new Path("hdfs://bdms-test/user/sloth/zyf <hdfs://bdms-test/user/sloth/zyf>"), new SimpleStringEncoder<>("UTF-8"))
>>             .build();
>> 
>>     source.addSink(sink);
>> 
>>     env.execute();
>> }”
>> 
>> And start the job, the jobmanager filesystem is error, the log means the jobmananger use “FileSystemFactoryEnhance” filesystem and confict.
>> 
>> As the url https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/#pluggable-file-systems <https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/#pluggable-file-systems> how to avoid use “Thread.currentThread().getContextClassLoader()"
>> 
>> 	
>> ouywl
>> ouywl@139.com
>>  <https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1&name=ouywl&uid=ouywl%40139.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fsma8dc7719018ba2517da7111b3db5a170.jpg&items=%5B%22ouywl%40139.com%22%5D>
> 
> 


Re: [Question] How to use different filesystem between checkpointdata and user data sink

Posted by ouywl <ou...@139.com>.
Hi Piotr:

     As debug the code, I found The jobmanager classpath is not content “system-plugin.jar”, But when run `configureFileSystems(configuration)` in `ClusterEntrypoint.startCluster()`, It will initialize with the FileSystem plugin in the method 'FileSystem.initialize(configuration, PluginUtils.createPluginManagerFromRootFolder(configuration));’,  After that when blob getFilesystem,It will load FileSystem by schema, So it will call “MyFileSystemFactory” which implements myself, and It have put core-site.xml, hfs-site.xml in “system-plugin.jar”, It is no **t  Suitable for ‘**hdfs://slothTest/user/sloth/HA/ **’ 。**

  

Full stack is :

  Log Type: jobmanager.log

Log Upload Time: Sun Dec 22 19:09:16 +0800 2019

Log Length: 39600

    
    
    2019-12-22 19:09:12,305 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - --------------------------------------------------------------------------------
    2019-12-22 19:09:12,307 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  Starting YarnJobClusterEntrypoint (Version: 1.9.1, Rev:4d56de8, Date:30.09.2019 @ 11:32:19 CST)
    2019-12-22 19:09:12,308 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  OS current user: yarn
    2019-12-22 19:09:12,790 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  Current Hadoop/Kerberos user: sloth
    2019-12-22 19:09:12,791 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  JVM: Java HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 1.8/25.152-b16
    2019-12-22 19:09:12,791 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  Maximum heap size: 406 MiBytes
    2019-12-22 19:09:12,791 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  JAVA_HOME: /usr/jdk64/jdk1.8.0_152
    2019-12-22 19:09:12,792 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  Hadoop version: 2.7.3
    2019-12-22 19:09:12,792 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  JVM Options:
    2019-12-22 19:09:12,793 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     -Xms424m
    2019-12-22 19:09:12,793 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     -Xmx424m
    2019-12-22 19:09:12,793 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     -XX:+PrintGCDetails
    2019-12-22 19:09:12,793 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     -XX:+PrintGCDateStamps
    2019-12-22 19:09:12,793 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     -Xloggc:/home/sloth/hadoop/yarn/logs/application_1576548114502_0152/container_e70_1576548114502_0152_02_000001/jobmanager-gc.log
    2019-12-22 19:09:12,793 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     -XX:+UseGCLogFileRotation
    2019-12-22 19:09:12,794 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     -XX:NumberOfGCLogFiles=1
    2019-12-22 19:09:12,794 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     -XX:GCLogFileSize=1M
    2019-12-22 19:09:12,794 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     -Dlog.file=/home/sloth/hadoop/yarn/logs/application_1576548114502_0152/container_e70_1576548114502_0152_02_000001/jobmanager.log
    2019-12-22 19:09:12,794 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     -Dlogback.configurationFile=file:logback.xml
    2019-12-22 19:09:12,794 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     -Dlog4j.configuration=file:log4j.properties
    2019-12-22 19:09:12,794 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  Program Arguments: (none)
    2019-12-22 19:09:12,795 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  Classpath: my-file-system-plugin-1.0-SNAPSHOT.jar:lib/flink-metrics-influxdb_2.11-sloth-flink-1.7.2.jar:lib/flink-shaded-hadoop-2-uber-2.7.5-7.0.jar:lib/flink-table-blink_2.12-1.9.1.jar:lib/flink-table_2.12-1.9.1.jar:lib/guava-19.0.jar:lib/log4j-1.2.17.jar:lib/slf4j-log4j12-1.7.15.jar:log4j.properties:logback.xml:plugins/README.txt:flink.jar:flink-conf.yaml:job.graph::/usr/ndp/current/yarn_nodemanager/conf:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/common/hadoop-common-2.7.3.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/common/hadoop-nfs-2.7.3.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/common/hadoop-common-2.7.3-tests.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/common/lib/commons-math3-3.1.1.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/common/lib/zookeeper-3.4.6.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/common/lib/mockito-all-1.8.5.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/common/lib/netty-3.6.2.Final.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/common/lib/jersey-json-1.9.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/common/lib/commons-beanutils-1.7.0.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/common/lib/hadoop-annotations-2.7.3.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/common/lib/jaxb-impl-2.2.3-1.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/common/lib/commons-beanutils-core-1.8.0.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/common/lib/htrace-core-3.1.0-incubating.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/common/lib/commons-cli-1.2.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/common/lib/jackson-xc-1.9.13.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/common/lib/jsr305-3.0.0.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/common/lib/java-xmlbuilder-0.4.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/common/lib/jetty-6.1.26.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/common/lib/gson-2.2.4.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/common/lib/xmlenc-0.52.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/common/lib/commons-configuration-1.6.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/common/lib/jackson-core-asl-1.9.13.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/common/lib/commons-io-2.4.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/common/lib/jersey-core-1.9.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/common/lib/servlet-api-2.5.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/common/lib/commons-collections-3.2.2.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/common/lib/activation-1.1.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/common/lib/commons-logging-1.1.3.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/common/lib/jackson-jaxrs-1.9.13.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/common/lib/xz-1.0.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/common/lib/commons-codec-1.4.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/common/lib/hamcrest-core-1.3.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/common/lib/slf4j-api-1.7.10.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/common/lib/curator-framework-2.7.1.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/common/lib/apacheds-kerberos-codec-2.0.0-M15.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/common/lib/asm-3.2.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/common/lib/hadoop-lzo-0.4.20.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/common/lib/jersey-server-1.9.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/common/lib/commons-net-3.1.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/common/lib/commons-digester-1.8.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/common/lib/jackson-mapper-asl-1.9.13.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/common/lib/commons-lang-2.6.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/common/lib/httpcore-4.2.5.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/common/lib/protobuf-java-2.5.0.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/common/lib/jaxb-api-2.2.2.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/common/lib/jsch-0.1.42.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/common/lib/jets3t-0.9.0.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/common/lib/curator-recipes-2.7.1.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/common/lib/jetty-util-6.1.26.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/common/lib/jettison-1.1.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/common/lib/log4j-1.2.17.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/common/lib/snappy-java-1.0.4.1.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/common/lib/curator-client-2.7.1.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/common/lib/junit-4.11.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/common/lib/commons-httpclient-3.1.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/common/lib/paranamer-2.3.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/common/lib/stax-api-1.0-2.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/common/lib/jsp-api-2.1.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/common/lib/avro-1.7.4.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/common/lib/commons-compress-1.4.1.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/common/lib/api-util-1.0.0-M20.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/common/lib/hadoop-auth-2.7.3.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/common/lib/apacheds-i18n-2.0.0-M15.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/common/lib/httpclient-4.2.5.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/common/lib/api-asn1-api-1.0.0-M20.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/common/lib/guava-11.0.2.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/hdfs/hadoop-hdfs-2.7.3.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/hdfs/hadoop-hdfs-nfs-2.7.3.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/hdfs/hadoop-hdfs-2.7.3-tests.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/hdfs/lib/netty-3.6.2.Final.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/hdfs/lib/htrace-core-3.1.0-incubating.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/hdfs/lib/commons-cli-1.2.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/hdfs/lib/xercesImpl-2.9.1.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/hdfs/lib/jsr305-3.0.0.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/hdfs/lib/jetty-6.1.26.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/hdfs/lib/xmlenc-0.52.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/hdfs/lib/jackson-core-asl-1.9.13.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/hdfs/lib/commons-io-2.4.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/hdfs/lib/jersey-core-1.9.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/hdfs/lib/servlet-api-2.5.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/hdfs/lib/commons-logging-1.1.3.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/hdfs/lib/commons-codec-1.4.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/hdfs/lib/asm-3.2.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/hdfs/lib/hadoop-lzo-0.4.20.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/hdfs/lib/jersey-server-1.9.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/hdfs/lib/netty-all-4.0.23.Final.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/hdfs/lib/jackson-mapper-asl-1.9.13.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/hdfs/lib/commons-lang-2.6.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/hdfs/lib/leveldbjni-all-1.8.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/hdfs/lib/protobuf-java-2.5.0.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/hdfs/lib/jetty-util-6.1.26.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/hdfs/lib/log4j-1.2.17.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/hdfs/lib/xml-apis-1.3.04.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/hdfs/lib/guava-11.0.2.jar:/usr/ndp/3.3.0/yarn_nodemanager/share/hadoop/hdfs/lib/commons-daemon-1.0.13.jar:/usr/ndp/current/yarn_nodemanager/share/hadoop/yarn/hadoop-yarn-server-nodemanager-2.7.3.jar:/usr/ndp/current/yarn_nodemanager/share/hadoop/yarn/hadoop-yarn-server-tests-2.7.3.jar:/usr/ndp/current/yarn_nodemanager/share/hadoop/yarn/hadoop-yarn-server-sharedcachemanager-2.7.3.jar:/usr/ndp/current/yarn_nodemanager/share/hadoop/yarn/hadoop-yarn-applications-unmanaged-am-launcher-2.7.3.jar:/usr/ndp/current/yarn_nodemanager/share/hadoop/yarn/hadoop-yarn-server-resourcemanager-2.7.3.jar:/usr/ndp/current/yarn_nodemanager/share/hadoop/yarn/hadoop-yarn-api-2.7.3.jar:/usr/ndp/current/yarn_nodemanager/share/hadoop/yarn/hadoop-yarn-server-common-2.7.3.jar:/usr/ndp/current/yarn_nodemanager/share/hadoop/yarn/hadoop-yarn-server-applicationhistoryservice-2.7.3.jar:/usr/ndp/current/yarn_nodemanager/share/hadoop/yarn/hadoop-yarn-common-2.7.3.jar:/usr/ndp/current/yarn_nodemanager/share/hadoop/yarn/hadoop-yarn-registry-2.7.3.jar:/usr/ndp/current/yarn_nodemanager/share/hadoop/yarn/hadoop-yarn-client-2.7.3.jar:/usr/ndp/current/yarn_nodemanager/share/hadoop/yarn/hadoop-yarn-applications-distributedshell-2.7.3.jar:/usr/ndp/current/yarn_nodemanager/share/hadoop/yarn/hadoop-yarn-server-web-proxy-2.7.3.jar:/usr/ndp/current/yarn_nodemanager/share/hadoop/yarn/lib/aopalliance-1.0.jar:/usr/ndp/current/yarn_nodemanager/share/hadoop/yarn/lib/zookeeper-3.4.6.jar:/usr/ndp/current/yarn_nodemanager/share/hadoop/yarn/lib/netty-3.6.2.Final.jar:/usr/ndp/current/yarn_nodemanager/share/hadoop/yarn/lib/jersey-json-1.9.jar:/usr/ndp/current/yarn_nodemanager/share/hadoop/yarn/lib/jaxb-impl-2.2.3-1.jar:/usr/ndp/current/yarn_nodemanager/share/hadoop/yarn/lib/commons-cli-1.2.jar:/usr/ndp/current/yarn_nodemanager/share/hadoop/yarn/lib/jackson-xc-1.9.13.jar:/usr/ndp/current/yarn_nodemanager/share/hadoop/yarn/lib/jsr305-3.0.0.jar:/usr/ndp/current/yarn_nodemanager/share/hadoop/yarn/lib/jetty-6.1.26.jar:/usr/ndp/current/yarn_nodemanager/share/hadoop/yarn/lib/jersey-guice-1.9.jar:/usr/ndp/current/yarn_nodemanager/share/hadoop/yarn/lib/javax.inject-1.jar:/usr/ndp/current/yarn_nodemanager/share/hadoop/yarn/lib/jackson-core-asl-1.9.13.jar:/usr/ndp/current/yarn_nodemanager/share/hadoop/yarn/lib/commons-io-2.4.jar:/usr/ndp/current/yarn_nodemanager/share/hadoop/yarn/lib/jersey-core-1.9.jar:/usr/ndp/current/yarn_nodemanager/share/hadoop/yarn/lib/servlet-api-2.5.jar:/usr/ndp/current/yarn_nodemanager/share/hadoop/yarn/lib/commons-collections-3.2.2.jar:/usr/ndp/current/yarn_nodemanager/share/hadoop/yarn/lib/activation-1.1.jar:/usr/ndp/current/yarn_nodemanager/share/hadoop/yarn/lib/commons-logging-1.1.3.jar:/usr/ndp/current/yarn_nodemanager/share/hadoop/yarn/lib/guice-servlet-3.0.jar:/usr/ndp/current/yarn_nodemanager/share/hadoop/yarn/lib/javassist-3.18.1-GA.jar:/usr/ndp/current/yarn_nodemanager/share/hadoop/yarn/lib/jackson-jaxrs-1.9.13.jar:/usr/ndp/current/yarn_nodemanager/share/hadoop/yarn/lib/xz-1.0.jar:/usr/ndp/current/yarn_nodemanager/share/hadoop/yarn/lib/zookeeper-3.4.6-tests.jar:/usr/ndp/current/yarn_nodemanager/share/hadoop/yarn/lib/commons-codec-1.4.jar:/usr/ndp/current/yarn_nodemanager/share/hadoop/yarn/lib/curator-test-2.7.1.jar:/usr/ndp/current/yarn_nodemanager/share/hadoop/yarn/lib/commons-math-2.2.jar:/usr/ndp/current/yarn_nodemanager/share/hadoop/yarn/lib/guice-3.0.jar:/usr/ndp/current/yarn_nodemanager/share/hadoop/yarn/lib/asm-3.2.jar:/usr/ndp/current/yarn_nodemanager/share/hadoop/yarn/lib/hadoop-lzo-0.4.20.jar:/usr/ndp/current/yarn_nodemanager/share/hadoop/yarn/lib/jersey-server-1.9.jar:/usr/ndp/current/yarn_nodemanager/share/hadoop/yarn/lib/jackson-mapper-asl-1.9.13.jar:/usr/ndp/current/yarn_nodemanager/share/hadoop/yarn/lib/commons-lang-2.6.jar:/usr/ndp/current/yarn_nodemanager/share/hadoop/yarn/lib/leveldbjni-all-1.8.jar:/usr/ndp/current/yarn_nodemanager/share/hadoop/yarn/lib/spark-2.1.0-yarn-shuffle.jar:/usr/ndp/current/yarn_nodemanager/share/hadoop/yarn/lib/protobuf-java-2.5.0.jar:/usr/ndp/current/yarn_nodemanager/share/hadoop/yarn/lib/jaxb-api-2.2.2.jar:/usr/ndp/current/yarn_nodemanager/share/hadoop/yarn/lib/jetty-util-6.1.26.jar:/usr/ndp/current/yarn_nodemanager/share/hadoop/yarn/lib/jettison-1.1.jar:/usr/ndp/current/yarn_nodemanager/share/hadoop/yarn/lib/log4j-1.2.17.jar:/usr/ndp/current/yarn_nodemanager/share/hadoop/yarn/lib/curator-client-2.7.1.jar:/usr/ndp/current/yarn_nodemanager/share/hadoop/yarn/lib/stax-api-1.0-2.jar:/usr/ndp/current/yarn_nodemanager/share/hadoop/yarn/lib/jersey-client-1.9.jar:/usr/ndp/current/yarn_nodemanager/share/hadoop/yarn/lib/commons-compress-1.4.1.jar:/usr/ndp/current/yarn_nodemanager/share/hadoop/yarn/lib/guava-11.0.2.jar
    2019-12-22 19:09:12,796 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - --------------------------------------------------------------------------------
    2019-12-22 19:09:12,798 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Registered UNIX signal handlers for [TERM, HUP, INT]
    2019-12-22 19:09:12,801 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - YARN daemon is running as: sloth Yarn client user obtainer: sloth
    2019-12-22 19:09:12,805 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: env.java.opts.jobmanager, -XX:+PrintGCDetails -XX:+PrintGCDateStamps -Xloggc:<LOG_DIR>/jobmanager-gc.log -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=1 -XX:GCLogFileSize=1M
    2019-12-22 19:09:12,806 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: restart-strategy.failure-rate.max-failures-per-interval, 5000
    2019-12-22 19:09:12,806 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: high-availability.cluster-id, application_1576548114502_0152
    2019-12-22 19:09:12,806 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporter.influxdb.password, ******
    2019-12-22 19:09:12,807 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.backend.rocksdb.predefined-options, FLASH_SSD_OPTIMIZED
    2019-12-22 19:09:12,807 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: FLINK_PLUGINS_DIR, /home/sloth/flink-1.9.1/plugins
    2019-12-22 19:09:12,807 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.backend.rocksdb.compaction.level.max-size-level-base, 536870912
    2019-12-22 19:09:12,807 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: high-availability.zookeeper.path.root, /sloth-flink
    2019-12-22 19:09:12,807 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: sql.resource.external-buffer.memory.mb, 64
    2019-12-22 19:09:12,808 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporter.influxdb.connectTimeout, 100000
    2019-12-22 19:09:12,808 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.backend.rocksdb.timer-service.factory, ROCKSDB
    2019-12-22 19:09:12,808 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.backend.rocksdb.metrics.mem-table-flush-pending, true
    2019-12-22 19:09:12,808 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.memory.off-heap, true
    2019-12-22 19:09:12,808 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporters, influxdb
    2019-12-22 19:09:12,808 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.backend.rocksdb.metrics.num-deletes-imm-mem-tables, true
    2019-12-22 19:09:12,809 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.backend.rocksdb.block.cache-size, 536870912
    2019-12-22 19:09:12,809 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: restart-strategy.failure-rate.failure-rate-interval, 6 min
    2019-12-22 19:09:12,809 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporter.influxdb.class, org.apache.flink.metrics.influxdb.InfluxdbReporter
    2019-12-22 19:09:12,809 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.backend.rocksdb.compaction.level.use-dynamic-size, true
    2019-12-22 19:09:12,809 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.heap.size, 1024m
    2019-12-22 19:09:12,810 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: yarn.per-job-cluster.include-user-jar, FIRST
    2019-12-22 19:09:12,810 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.backend.rocksdb.metrics.compaction-pending, true
    2019-12-22 19:09:12,810 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporter.influxdb.username, flink-metrics
    2019-12-22 19:09:12,810 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: yarn.tags, job-id@493,job-type@JAR,kernel-type@8,product@sloth,version@8,sys@sloth
    2019-12-22 19:09:12,810 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.backend.rocksdb.localdir, /mnt/dfs/1/rocksdb/
    2019-12-22 19:09:12,811 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.backend.incremental, true
    2019-12-22 19:09:12,811 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.memory.preallocate, false
    2019-12-22 19:09:12,811 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: high-availability.zookeeper.storageDir, hdfs://slothTest/user/sloth/HA/
    2019-12-22 19:09:12,811 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.backend.rocksdb.metrics.num-running-flushes, true
    2019-12-22 19:09:12,811 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporter.influxdb.port, 8091
    2019-12-22 19:09:12,812 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.backend.rocksdb.metrics.num-entries-active-mem-table, true
    2019-12-22 19:09:12,812 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: high-availability.zookeeper.quorum, sloth-test0.dg.163.org:2181,sloth-test1.dg.163.org:2181,sloth-test2.dg.163.org:2181
    2019-12-22 19:09:12,812 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.backend.rocksdb.metrics.num-entries-imm-mem-tables, true
    2019-12-22 19:09:12,812 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.backend.rocksdb.metrics.num-running-compactions, true
    2019-12-22 19:09:12,812 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: sql.resource.default.memory.mb, 64
    2019-12-22 19:09:12,813 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.backend, rocksdb
    2019-12-22 19:09:12,813 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.backend.rocksdb.metrics.estimate-table-readers-mem, true
    2019-12-22 19:09:12,813 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.backend.rocksdb.metrics.estimate-pending-compaction-bytes, true
    2019-12-22 19:09:12,813 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporter.influxdb.host, sloth-tsdb0.dg.163.org,sloth-tsdb1.dg.163.org,sloth-tsdb2.dg.163.org
    2019-12-22 19:09:12,813 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: sql.resource.sink.default.memory.mb, 128
    2019-12-22 19:09:12,814 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.backend.rocksdb.metrics.background-errors, true
    2019-12-22 19:09:12,814 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporter.influxdb.filter, (.*)numRecordsIn(.*),(.*)numRecordsOut(.*),(.*)Latency,(.*)records_consumed_rate,(.*)records_lag_max,(.*)fullRestarts(.*),(.*)uptime,(.*)numRunningJobs,(.*)numRegisteredTaskManagers,(.*)taskSlots(.*),(.*)Checkpoint(.*),(.*)Status_JVM(.*)
    2019-12-22 19:09:12,814 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporter.influxdb.db, flink-test-V2
    2019-12-22 19:09:12,814 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.backend.rocksdb.thread.num, 2
    2019-12-22 19:09:12,814 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: yarn.maximum-failed-containers, 36000
    2019-12-22 19:09:12,815 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.network.numberOfBuffers, 4096
    2019-12-22 19:09:12,815 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: yarn.application-attempts, 36000
    2019-12-22 19:09:12,815 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.backend.rocksdb.metrics.num-deletes-active-mem-table, true
    2019-12-22 19:09:12,815 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.numberOfTaskSlots, 1
    2019-12-22 19:09:12,815 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.backend.rocksdb.metrics.num-snapshots, true
    2019-12-22 19:09:12,815 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: sql.resource.source.default.memory.mb, 128
    2019-12-22 19:09:12,815 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.backend.rocksdb.metrics.estimate-live-data-size, true
    2019-12-22 19:09:12,816 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.backend.fs.checkpointdir, hdfs://slothTest/user/sloth/sloth-fs-checkpoints/cpk/1_7
    2019-12-22 19:09:12,816 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: restart-strategy.failure-rate.delay, 1 s
    2019-12-22 19:09:12,816 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: blob.storage.directory, /mnt/dfs/0/yarn/local/blobJars/
    2019-12-22 19:09:12,816 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.memory.size, 1024
    2019-12-22 19:09:12,816 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.backend.rocksdb.metrics.cur-size-active-mem-table, true
    2019-12-22 19:09:12,816 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.backend.rocksdb.metrics.num-immutable-mem-table, true
    2019-12-22 19:09:12,817 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.backend.rocksdb.metrics.size-all-mem-tables, true
    2019-12-22 19:09:12,817 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.backend.rocksdb.metrics.total-sst-files-size, true
    2019-12-22 19:09:12,817 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.backend.rocksdb.compaction.level.target-file-size-base, 33554432
    2019-12-22 19:09:12,817 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.backend.rocksdb.metrics.cur-size-all-mem-tables, true
    2019-12-22 19:09:12,817 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.backend.rocksdb.metrics.estimate-num-keys, true
    2019-12-22 19:09:12,817 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: akka.framesize, 1048576000b
    2019-12-22 19:09:12,818 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.backend.rocksdb.metrics.actual-delayed-write-rate, true
    2019-12-22 19:09:12,818 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: internal.cluster.execution-mode, DETACHED
    2019-12-22 19:09:12,818 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporter.influxdb.writeTimeout, 10000
    2019-12-22 19:09:12,818 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: high-availability, zookeeper
    2019-12-22 19:09:12,818 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporter.influxdb.consistency, ANY
    2019-12-22 19:09:12,819 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: restart-strategy, failure-rate
    2019-12-22 19:09:12,819 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.backend.rocksdb.metrics.num-live-versions, true
    2019-12-22 19:09:12,819 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: env.java.opts.taskmanager, -XX:+PrintGCDetails -XX:+PrintGCDateStamps -Xloggc:<LOG_DIR>/taskmanager-gc.log  -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=1 -XX:GCLogFileSize=5M
    2019-12-22 19:09:12,819 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.backend.rocksdb.writebuffer.size, 134217728
    2019-12-22 19:09:12,819 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.heap.size, 4096m
    2019-12-22 19:09:12,819 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.checkpoints.dir, hdfs://slothTest/user/sloth/sloth-fs-checkpoints/meta/1_7
    2019-12-22 19:09:12,820 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: akka.client.timeout, 600 s
    2019-12-22 19:09:12,846 WARN  org.apache.flink.configuration.Configuration                  - Config uses deprecated configuration key 'web.port' instead of proper key 'rest.bind-port'
    2019-12-22 19:09:12,851 INFO  org.apache.flink.runtime.clusterframework.BootstrapTools      - Setting directories for temporary files to: /mnt/dfs/0/yarn/local/usercache/sloth/appcache/application_1576548114502_0152,/mnt/dfs/1/yarn/local/usercache/sloth/appcache/application_1576548114502_0152,/home/sloth/hadoop/yarn/local/usercache/sloth/appcache/application_1576548114502_0152
    2019-12-22 19:09:12,866 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Starting YarnJobClusterEntrypoint.
    2019-12-22 19:09:12,867 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Install default filesystem.
    2019-12-22 19:09:12,967 INFO  org.apache.flink.runtime.security.modules.HadoopModule        - Hadoop user set to sloth (auth:SIMPLE)
    2019-12-22 19:09:12,982 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Initializing cluster services.
    2019-12-22 19:09:13,446 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils         - Trying to start actor system at sloth-test1.dg.163.org:0
    2019-12-22 19:09:14,207 INFO  akka.event.slf4j.Slf4jLogger                                  - Slf4jLogger started
    2019-12-22 19:09:14,239 INFO  akka.remote.Remoting                                          - Starting remoting
    2019-12-22 19:09:14,425 INFO  akka.remote.Remoting                                          - Remoting started; listening on addresses :[akka.tcp://flink@sloth-test1.dg.163.org:16847]
    2019-12-22 19:09:14,590 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils         - Actor system started at akka.tcp://flink@sloth-test1.dg.163.org:16847
    2019-12-22 19:09:14,599 WARN  org.apache.flink.configuration.Configuration                  - Config uses deprecated configuration key 'high-availability.zookeeper.storageDir' instead of proper key 'high-availability.storageDir'
    2019-12-22 19:09:14,603 INFO  com.filesystem.plugin.MyFileSystemFactory                     -  trying to get hadoopEnv, hadoopPath = /conf/hadoop_conf
    2019-12-22 19:09:14,877 INFO  org.apache.hadoop.security.UserGroupInformation               - Login successful for user mammut_qa/dev@BDMS.163.COM using keytab file /tmp/zyf_test/mammut_qa.keytab
    2019-12-22 19:09:15,123 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Shutting YarnJobClusterEntrypoint down with application status FAILED. Diagnostics java.io.IOException: Could not create FileSystem for highly available storage (high-availability.storageDir)
    	at org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:119)
    	at org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:92)
    	at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:120)
    	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:292)
    	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:257)
    	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:202)
    	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:164)
    	at java.security.AccessController.doPrivileged(Native Method)
    	at javax.security.auth.Subject.doAs(Subject.java:422)
    	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
    	at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
    	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:163)
    	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:501)
    	at org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:102)
    Caused by: java.lang.IllegalArgumentException: java.net.UnknownHostException: slothTest
    	at org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:378)
    	at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:310)
    	at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)
    	at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:678)
    	at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:619)
    	at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:149)
    	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2669)
    	at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
    	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
    	at org.apache.hadoop.fs.FileSystem$Cache.getUnique(FileSystem.java:2691)
    	at org.apache.hadoop.fs.FileSystem.newInstance(FileSystem.java:420)
    	at com.filesystem.plugin.MyFileSystemKerberosMammutFactory3.getFileSystem(MyFileSystemKerberosMammutFactory3.java:63)
    	at com.filesystem.plugin.MyFileSystemKerberosMammutFactory3.create(MyFileSystemKerberosMammutFactory3.java:38)
    	at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:438)
    	at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:359)
    	at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
    	at org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:116)
    	... 13 more
    Caused by: java.net.UnknownHostException: slothTest
    	... 30 more
    .
    2019-12-22 19:09:15,130 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Stopping Akka RPC service.
    2019-12-22 19:09:15,142 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Shutting down remote daemon.
    2019-12-22 19:09:15,144 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remote daemon shut down; proceeding with flushing remote transports.
    2019-12-22 19:09:15,172 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remoting shut down.
    2019-12-22 19:09:15,207 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Stopped Akka RPC service.
    2019-12-22 19:09:15,208 ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Could not start cluster entrypoint YarnJobClusterEntrypoint.
    org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to initialize the cluster entrypoint YarnJobClusterEntrypoint.
    	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:182)
    	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:501)
    	at org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:102)
    Caused by: java.io.IOException: Could not create FileSystem for highly available storage (high-availability.storageDir)
    	at org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:119)
    	at org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:92)
    	at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:120)
    	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:292)
    	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:257)
    	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:202)
    	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:164)
    	at java.security.AccessController.doPrivileged(Native Method)
    	at javax.security.auth.Subject.doAs(Subject.java:422)
    	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
    	at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
    	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:163)
    	... 2 more
    Caused by: java.lang.IllegalArgumentException: java.net.UnknownHostException: slothTest
    	at org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:378)
    	at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:310)
    	at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)
    	at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:678)
    	at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:619)
    	at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:149)
    	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2669)
    	at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
    	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
    	at org.apache.hadoop.fs.FileSystem$Cache.getUnique(FileSystem.java:2691)
    	at org.apache.hadoop.fs.FileSystem.newInstance(FileSystem.java:420)
    	at com.filesystem.plugin.MyFileSystemKerberosMammutFactory3.getFileSystem(MyFileSystemKerberosMammutFactory3.java:63)
    	at com.filesystem.plugin.MyFileSystemKerberosMammutFactory3.create(MyFileSystemKerberosMammutFactory3.java:38)
    	at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:438)
    	at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:359)
    	at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
    	at org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:116)
    	... 13 more
    Caused by: java.net.UnknownHostException: slothTest
    	... 30 more

  

[ ![](https://mail-
online.nosdn.127.net/sma8dc7719018ba2517da7111b3db5a170.jpg) |  ouywl  
---|---  
ouywl@139.com  
](https://maas.mail.163.com/dashi-web-
extend/html/proSignature.html?ftlId=1&name=ouywl&uid=ouywl%40139.com&iconUrl=https%3A%2F%2Fmail-
online.nosdn.127.net%2Fsma8dc7719018ba2517da7111b3db5a170.jpg&items=%5B%22ouywl%40139.com%22%5D)

签名由 [网易邮箱大师](https://mail.163.com/dashi/dlpro.html?from=mail81) 定制

  

On 12/19/2019 11:06,[ouywl<ou...@139.com>](mailto:ouywl@139.com) wrote:

> Hi Piotr Nowojski,

>

>    I have move my filesystem plugin to FLINK_HOME/pulgins in flink 1.9.1.
The jobmanage don’t start up ,and It load the filesystem plugin in my owner
plugin jar . and the log is :

>

>   “2019-12-19 10:58:32,394 WARN org.apache.flink.configuration.Configuration
- Config uses deprecated configuration key 'high-
availability.zookeeper.storageDir' instead of proper key 'high-
availability.storageDir'

>  
>  
>     2019-12-19 10:58:32,398 INFO
**com.filesystem.plugin.FileSystemFactoryEnhance**               -  trying to
get hadoopEnv, hadoopPath = /conf/hadoop_conf

>     2019-12-19 10:58:32,434 WARN  org.apache.hadoop.conf.Configuration
- /tmp/mammut-core-site.xml:an attempt to override final parameter:
fs.defaultFS;  Ignoring.

>     2019-12-19 10:58:32,436 WARN  org.apache.hadoop.conf.Configuration
- /tmp/mammut-hdfs-site.xml:an attempt to override final parameter:
dfs.datanode.data.dir;  Ignoring.

>     2019-12-19 10:58:32,436 WARN  org.apache.hadoop.conf.Configuration
- /tmp/mammut-hdfs-site.xml:an attempt to override final parameter:
dfs.datanode.failed.volumes.tolerated;  Ignoring.

>     2019-12-19 10:58:32,436 WARN  org.apache.hadoop.conf.Configuration
- /tmp/mammut-hdfs-site.xml:an attempt to override final parameter:
dfs.namenode.name.dir;  Ignoring.

>     2019-12-19 10:58:32,878 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Shutting
YarnJobClusterEntrypoint down with application status FAILED. Diagnostics
java.io.IOException: Could not create FileSystem for highly available storage
(high-availability.storageDir)

>       at
org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:119)

>       at
org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:92)

>       at
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:120)

>       at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:292)

>       at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:257)

>       at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:202)

>       at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:164)

>       at java.security.AccessController.doPrivileged(Native Method)

>       at javax.security.auth.Subject.doAs(Subject.java:422)

>       at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)

>       at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

>       at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:163)

>       at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:501)  
>

>  at
org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:102)"

>

>  
>

>

> [ ![](https://mail-
online.nosdn.127.net/sma8dc7719018ba2517da7111b3db5a170.jpg) |  ouywl  
> ---|---  
> ouywl@139.com  
> ](https://maas.mail.163.com/dashi-web-
extend/html/proSignature.html?ftlId=1&name=ouywl&uid=ouywl%40139.com&iconUrl=https%3A%2F%2Fmail-
online.nosdn.127.net%2Fsma8dc7719018ba2517da7111b3db5a170.jpg&items=%5B%22ouywl%40139.com%22%5D)

>

> On 12/19/2019 00:01,[Piotr
Nowojski<pi...@ververica.com>](mailto:piotr@ververica.com) wrote:

>

>> Hi,

>>

>>  
>

>>

>> As Yang Wang pointed out, you should use the new plugins mechanism.

>>

>>  
>

>>

>> If it doesn’t work, first make sure that you are shipping/distributing the
plugins jars correctly - the correct plugins directory structure both on the
client machine. Next make sure that the cluster has the same correct setup.
This is especially true for the standalone/cluster execution modes. For yarn,
mesos, docker the plugins dir should be shipped to the cluster by Flink
itself, however Plugins support in yarn is currently semi broken [1]. This is
already fixed, but waiting to be released in 1.9.2 and 1.10.

>>

>>  
>

>>

>> If it still doesn’t work, look for TaskManager logs what plugins/file
systems are being loaded during the startup. If none, that's the problem.

>>

>>  
>

>>

>> Piotrek

>>

>>  
>

>>

>> [1] <https://issues.apache.org/jira/browse/FLINK-14382>  
>

>>

>>  
>

>>

>>> On 18 Dec 2019, at 12:40, Yang Wang
<[danrtsey.wy@gmail.com](mailto:danrtsey.wy@gmail.com)> wrote:

>>>

>>>  
>

>>>

>>> You could have a try the new plugin mechanism.

>>>

>>> Create a new directory named "myhdfs" under $FLINK_HOME/plugins, and then
put your filesystem related jars in it.

>>>

>>> Different plugins will be loaded by separate classloader to avoid
conflict.

>>>

>>>  
>

>>>

>>>  
>

>>>

>>> Best,

>>>

>>> Yang

>>>

>>>  
>

>>>

>>> vino yang <[yanghua1127@gmail.com](mailto:yanghua1127@gmail.com)>
于2019年12月18日周三 下午6:46写道:  
>

>>>

>>>> Hi ouywl,

>>>>

>>>>  
>

>>>>  
>>>>  
>>>>      **

>>>>  
>>>>      **> >    _Thread.currentThread().getContextClassLoader()_ ;**

>>>>  
>>>>     **

>>>>

>>>> What does this statement mean in your program?

>>>>

>>>>  
>

>>>>

>>>> In addition, can you share your implementation of the customized file
system plugin and the related exception?

>>>>

>>>>  
>

>>>>

>>>> Best,

>>>>

>>>> Vino

>>>>

>>>>  
>

>>>>

>>>> ouywl <[ouywl@139.com](mailto:ouywl@139.com)> 于2019年12月18日周三 下午4:59写道:  
>

>>>>

>>>>> Hi all,

>>>>>

>>>>>     We have implemented a filesystem plugin for sink data to hdfs1, and
the yarn for flink running is used hdfs2. So when the job running, the
jobmanager use the conf of hdfs1 to create filesystem, the filesystem plugin
is conflict with flink component.

>>>>>

>>>>>     We implemeted step:

>>>>>

>>>>>       1\.  ‘FileSystemEnhance’ is implement from “FileSystem”

>>>>>

>>>>>       2\.  ‘FileSystemFactoryEnhance’ is implement from
“FileSystemFactory”,add kerberos auth in ”FileSystemFactoryEnhance"

>>>>>

>>>>>       3. Add a service entry. Create a file `META-
INF/services/org.apache.flink.core.fs.FileSystemFactory` which contains the
class name of “ FileSystemFactoryEnhance.class”

>>>>>

>>>>>  
>

>>>>>

>>>>> And  the job mainclass is :

>>>>>

>>>>>    “  ** _public static void main(String[] args) throws Exception{_**

>>>>>  
>>>>>  
>>>>>      ** _StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();_**

>>>>>  
>>>>>      **

>>>>>  
>>>>>      ** _env.enableCheckpointing(60*1000);_**

>>>>>  
>>>>>      **
_env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);_**

>>>>>  
>>>>>      **
_env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);_**

>>>>>  
>>>>>      ** _env.getConfig().enableSysoutLogging();_**

>>>>>  
>>>>>      _  
>>>>>     > _

>>>>>  
>>>>>      _  
>>>>>     > _

>>>>>  
>>>>>      **_Properties props =_ new _Properties()_ ;**

>>>>>  
>>>>>      **_props.put(_ "bootstrap.servers", SERVERS _)_ ;**

>>>>>  
>>>>>      **_props.put(_ "[group.id](http://group.id/)", GROUPID _)_ ;**

>>>>>  
>>>>>      **_props.put(_ "enable.auto.commit", "true" _)_ ;**

>>>>>  
>>>>>      ** //
props.put("[auto.commit.interval.ms](http://auto.commit.interval.ms/)",
"1000");**

>>>>>  
>>>>>      **_props.put(_ "[session.timeout.ms](http://session.timeout.ms/)",
"30000" _)_ ;**

>>>>>  
>>>>>      **_props.put(_ "auto.offset.reset", "latest" _)_ ;**

>>>>>  
>>>>>      **_props.put(_ "key.deserializer",
_org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StringDeserializer._
class _.getName())_ ;**

>>>>>  
>>>>>      **_props.put(_ "value.deserializer", _StringDeserializer._ class
_.getName())_ ;**

>>>>>  
>>>>>      **_FlinkKafkaConsumer010 consumer011 =_ new _FlinkKafkaConsumer010
<String>(_"zyf_test_2", new _SimpleStringSchema()_ , _props)_ ;**

>>>>>  
>>>>>      **_DataStream <String> source =
env.addSource(consumer011).setParallelism(_1 _)_ ;**

>>>>>  
>>>>>      _  
>>>>>     > _

>>>>>  
>>>>>      **_source.print()_ ;**

>>>>>  
>>>>>      **_Thread.currentThread().getContextClassLoader()_ ;**

>>>>>  
>>>>>      _  
>>>>>     > _

>>>>>  
>>>>>      **_StreamingFileSink sink = StreamingFileSink_**

>>>>>  
>>>>>      _

>>>>>  
>>>>>      ** _.forRowFormat(_ new _Path(_ "<hdfs://bdms-test/user/sloth/zyf>"
_)_ , new _SimpleStringEncoder <>(_"UTF-8" _))_**

>>>>>  
>>>>>     _ _

>>>>>  
>>>>>      ** _.build()_ ;**

>>>>>  
>>>>>     _

>>>>>  
>>>>>      _  
>>>>>     > _

>>>>>  
>>>>>      **_source.addSink(sink)_ ;**

>>>>>  
>>>>>      _  
>>>>>     > _

>>>>>  
>>>>>      **_env.execute()_ ;**

>>>>>  
>>>>>      ** _}_** ”

>>>>>  
>>>>>  
>>>>>     >

>>>>>  
>>>>>     And start the job, the jobmanager filesystem is error, the log means
the jobmananger use “FileSystemFactoryEnhance _”_ _  filesystem and confict._

>>>>>  
>>>>>      _  
>>>>>     > _

>>>>>  
>>>>>      _As the url_<https://ci.apache.org/projects/flink/flink-docs-
stable/ops/filesystems/#pluggable-file-systems> how to avoid use “ **Thread.
currentThread().getContextClassLoader()**"

>>>>>  
>>>>>     **

>>>>>

>>>>>  
>

>>>>>

>>>>> [ ![](https://mail-
online.nosdn.127.net/sma8dc7719018ba2517da7111b3db5a170.jpg) |  ouywl  
>>>>> ---|---  
>>>>> ouywl@139.com  
>>>>> ](https://maas.mail.163.com/dashi-web-
extend/html/proSignature.html?ftlId=1&name=ouywl&uid=ouywl%40139.com&iconUrl=https%3A%2F%2Fmail-
online.nosdn.127.net%2Fsma8dc7719018ba2517da7111b3db5a170.jpg&items=%5B%22ouywl%40139.com%22%5D)

>>>>>

>>>>>  
>

>>

>>  
>


Re: [Question] How to use different filesystem between checkpointdata and user data sink

Posted by ouywl <ou...@139.com>.
Hi Piotr Nowojski,

   I have move my filesystem plugin to FLINK_HOME/pulgins in flink 1.9.1. The
jobmanage don’t start up ,and It load the filesystem plugin in my owner plugin
jar . and the log is :

  “2019-12-19 10:58:32,394 WARN org.apache.flink.configuration.Configuration -
Config uses deprecated configuration key 'high-
availability.zookeeper.storageDir' instead of proper key 'high-
availability.storageDir'

    
    
    2019-12-19 10:58:32,398 INFO **com.filesystem.plugin.FileSystemFactoryEnhance**               -  trying to get hadoopEnv, hadoopPath = /conf/hadoop_conf
    2019-12-19 10:58:32,434 WARN  org.apache.hadoop.conf.Configuration                          - /tmp/mammut-core-site.xml:an attempt to override final parameter: fs.defaultFS;  Ignoring.
    2019-12-19 10:58:32,436 WARN  org.apache.hadoop.conf.Configuration                          - /tmp/mammut-hdfs-site.xml:an attempt to override final parameter: dfs.datanode.data.dir;  Ignoring.
    2019-12-19 10:58:32,436 WARN  org.apache.hadoop.conf.Configuration                          - /tmp/mammut-hdfs-site.xml:an attempt to override final parameter: dfs.datanode.failed.volumes.tolerated;  Ignoring.
    2019-12-19 10:58:32,436 WARN  org.apache.hadoop.conf.Configuration                          - /tmp/mammut-hdfs-site.xml:an attempt to override final parameter: dfs.namenode.name.dir;  Ignoring.
    2019-12-19 10:58:32,878 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Shutting YarnJobClusterEntrypoint down with application status FAILED. Diagnostics java.io.IOException: Could not create FileSystem for highly available storage (high-availability.storageDir)
    	at org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:119)
    	at org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:92)
    	at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:120)
    	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:292)
    	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:257)
    	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:202)
    	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:164)
    	at java.security.AccessController.doPrivileged(Native Method)
    	at javax.security.auth.Subject.doAs(Subject.java:422)
    	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
    	at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
    	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:163)
    	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:501)  

 at
org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:102)"

  

[ ![](https://mail-
online.nosdn.127.net/sma8dc7719018ba2517da7111b3db5a170.jpg) |  ouywl  
---|---  
ouywl@139.com  
](https://maas.mail.163.com/dashi-web-
extend/html/proSignature.html?ftlId=1&name=ouywl&uid=ouywl%40139.com&iconUrl=https%3A%2F%2Fmail-
online.nosdn.127.net%2Fsma8dc7719018ba2517da7111b3db5a170.jpg&items=%5B%22ouywl%40139.com%22%5D)

On 12/19/2019 00:01,[Piotr
Nowojski<pi...@ververica.com>](mailto:piotr@ververica.com) wrote:

> Hi,

>

>  
>

>

> As Yang Wang pointed out, you should use the new plugins mechanism.

>

>  
>

>

> If it doesn’t work, first make sure that you are shipping/distributing the
plugins jars correctly - the correct plugins directory structure both on the
client machine. Next make sure that the cluster has the same correct setup.
This is especially true for the standalone/cluster execution modes. For yarn,
mesos, docker the plugins dir should be shipped to the cluster by Flink
itself, however Plugins support in yarn is currently semi broken [1]. This is
already fixed, but waiting to be released in 1.9.2 and 1.10.

>

>  
>

>

> If it still doesn’t work, look for TaskManager logs what plugins/file
systems are being loaded during the startup. If none, that's the problem.

>

>  
>

>

> Piotrek

>

>  
>

>

> [1] <https://issues.apache.org/jira/browse/FLINK-14382>  
>

>

>  
>

>

>> On 18 Dec 2019, at 12:40, Yang Wang
<[danrtsey.wy@gmail.com](mailto:danrtsey.wy@gmail.com)> wrote:

>>

>>  
>

>>

>> You could have a try the new plugin mechanism.

>>

>> Create a new directory named "myhdfs" under $FLINK_HOME/plugins, and then
put your filesystem related jars in it.

>>

>> Different plugins will be loaded by separate classloader to avoid conflict.

>>

>>  
>

>>

>>  
>

>>

>> Best,

>>

>> Yang

>>

>>  
>

>>

>> vino yang <[yanghua1127@gmail.com](mailto:yanghua1127@gmail.com)>
于2019年12月18日周三 下午6:46写道:  
>

>>

>>> Hi ouywl,

>>>

>>>  
>

>>>  
>>>  
>>>      **

>>>  
>>>      **> >    _Thread.currentThread().getContextClassLoader()_ ;**

>>>  
>>>     **

>>>

>>> What does this statement mean in your program?

>>>

>>>  
>

>>>

>>> In addition, can you share your implementation of the customized file
system plugin and the related exception?

>>>

>>>  
>

>>>

>>> Best,

>>>

>>> Vino

>>>

>>>  
>

>>>

>>> ouywl <[ouywl@139.com](mailto:ouywl@139.com)> 于2019年12月18日周三 下午4:59写道:  
>

>>>

>>>> Hi all,

>>>>

>>>>     We have implemented a filesystem plugin for sink data to hdfs1, and
the yarn for flink running is used hdfs2. So when the job running, the
jobmanager use the conf of hdfs1 to create filesystem, the filesystem plugin
is conflict with flink component.

>>>>

>>>>     We implemeted step:

>>>>

>>>>       1\.  ‘FileSystemEnhance’ is implement from “FileSystem”

>>>>

>>>>       2\.  ‘FileSystemFactoryEnhance’ is implement from
“FileSystemFactory”,add kerberos auth in ”FileSystemFactoryEnhance"

>>>>

>>>>       3. Add a service entry. Create a file `META-
INF/services/org.apache.flink.core.fs.FileSystemFactory` which contains the
class name of “ FileSystemFactoryEnhance.class”

>>>>

>>>>  
>

>>>>

>>>> And  the job mainclass is :

>>>>

>>>>    “  ** _public static void main(String[] args) throws Exception{_**

>>>>  
>>>>  
>>>>      ** _StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();_**

>>>>  
>>>>      **

>>>>  
>>>>      ** _env.enableCheckpointing(60*1000);_**

>>>>  
>>>>      **
_env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);_**

>>>>  
>>>>      **
_env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);_**

>>>>  
>>>>      ** _env.getConfig().enableSysoutLogging();_**

>>>>  
>>>>      _  
>>>>     > _

>>>>  
>>>>      _  
>>>>     > _

>>>>  
>>>>      **_Properties props =_ new _Properties()_ ;**

>>>>  
>>>>      **_props.put(_ "bootstrap.servers", SERVERS _)_ ;**

>>>>  
>>>>      **_props.put(_ "[group.id](http://group.id/)", GROUPID _)_ ;**

>>>>  
>>>>      **_props.put(_ "enable.auto.commit", "true" _)_ ;**

>>>>  
>>>>      ** //
props.put("[auto.commit.interval.ms](http://auto.commit.interval.ms/)",
"1000");**

>>>>  
>>>>      **_props.put(_ "[session.timeout.ms](http://session.timeout.ms/)",
"30000" _)_ ;**

>>>>  
>>>>      **_props.put(_ "auto.offset.reset", "latest" _)_ ;**

>>>>  
>>>>      **_props.put(_ "key.deserializer",
_org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StringDeserializer._
class _.getName())_ ;**

>>>>  
>>>>      **_props.put(_ "value.deserializer", _StringDeserializer._ class
_.getName())_ ;**

>>>>  
>>>>      **_FlinkKafkaConsumer010 consumer011 =_ new _FlinkKafkaConsumer010
<String>(_"zyf_test_2", new _SimpleStringSchema()_ , _props)_ ;**

>>>>  
>>>>      **_DataStream <String> source =
env.addSource(consumer011).setParallelism(_1 _)_ ;**

>>>>  
>>>>      _  
>>>>     > _

>>>>  
>>>>      **_source.print()_ ;**

>>>>  
>>>>      **_Thread.currentThread().getContextClassLoader()_ ;**

>>>>  
>>>>      _  
>>>>     > _

>>>>  
>>>>      **_StreamingFileSink sink = StreamingFileSink_**

>>>>  
>>>>      _

>>>>  
>>>>      ** _.forRowFormat(_ new _Path(_ "<hdfs://bdms-test/user/sloth/zyf>"
_)_ , new _SimpleStringEncoder <>(_"UTF-8" _))_**

>>>>  
>>>>     _ _

>>>>  
>>>>      ** _.build()_ ;**

>>>>  
>>>>     _

>>>>  
>>>>      _  
>>>>     > _

>>>>  
>>>>      **_source.addSink(sink)_ ;**

>>>>  
>>>>      _  
>>>>     > _

>>>>  
>>>>      **_env.execute()_ ;**

>>>>  
>>>>      ** _}_** ”

>>>>  
>>>>  
>>>>     >

>>>>  
>>>>     And start the job, the jobmanager filesystem is error, the log means
the jobmananger use “FileSystemFactoryEnhance _”_ _  filesystem and confict._

>>>>  
>>>>      _  
>>>>     > _

>>>>  
>>>>      _As the url_<https://ci.apache.org/projects/flink/flink-docs-
stable/ops/filesystems/#pluggable-file-systems> how to avoid use “ **Thread.
currentThread().getContextClassLoader()**"

>>>>  
>>>>     **

>>>>

>>>>  
>

>>>>

>>>> [ ![](https://mail-
online.nosdn.127.net/sma8dc7719018ba2517da7111b3db5a170.jpg) |  ouywl  
>>>> ---|---  
>>>> ouywl@139.com  
>>>> ](https://maas.mail.163.com/dashi-web-
extend/html/proSignature.html?ftlId=1&name=ouywl&uid=ouywl%40139.com&iconUrl=https%3A%2F%2Fmail-
online.nosdn.127.net%2Fsma8dc7719018ba2517da7111b3db5a170.jpg&items=%5B%22ouywl%40139.com%22%5D)

>>>>

>>>>  
>

>

>  
>


Re: [Question] How to use different filesystem between checkpoint data and user data sink

Posted by Piotr Nowojski <pi...@ververica.com>.
Hi,

As Yang Wang pointed out, you should use the new plugins mechanism.

If it doesn’t work, first make sure that you are shipping/distributing the plugins jars correctly - the correct plugins directory structure both on the client machine. Next make sure that the cluster has the same correct setup. This is especially true for the standalone/cluster execution modes. For yarn, mesos, docker the plugins dir should be shipped to the cluster by Flink itself, however Plugins support in yarn is currently semi broken [1]. This is already fixed, but waiting to be released in 1.9.2 and 1.10.

If it still doesn’t work, look for TaskManager logs what plugins/file systems are being loaded during the startup. If none, that's the problem.

Piotrek

[1] https://issues.apache.org/jira/browse/FLINK-14382 <https://issues.apache.org/jira/browse/FLINK-14382>

> On 18 Dec 2019, at 12:40, Yang Wang <da...@gmail.com> wrote:
> 
> You could have a try the new plugin mechanism.
> Create a new directory named "myhdfs" under $FLINK_HOME/plugins, and then put your filesystem related jars in it.
> Different plugins will be loaded by separate classloader to avoid conflict.
> 
> 
> Best,
> Yang
> 
> vino yang <yanghua1127@gmail.com <ma...@gmail.com>> 于2019年12月18日周三 下午6:46写道:
> Hi ouywl,
> 
> >>    Thread.currentThread().getContextClassLoader();
> What does this statement mean in your program?
> 
> In addition, can you share your implementation of the customized file system plugin and the related exception?
> 
> Best,
> Vino
> 
> ouywl <ouywl@139.com <ma...@139.com>> 于2019年12月18日周三 下午4:59写道:
> Hi all,
>     We have implemented a filesystem plugin for sink data to hdfs1, and the yarn for flink running is used hdfs2. So when the job running, the jobmanager use the conf of hdfs1 to create filesystem, the filesystem plugin  is conflict with flink component. 
>     We implemeted step:
>       1.  ‘FileSystemEnhance’ is implement from “FileSystem”
>       2.  ‘FileSystemFactoryEnhance’ is implement from “FileSystemFactory”,add kerberos auth in ”FileSystemFactoryEnhance" 
>       3. Add a service entry. Create a file META-INF/services/org.apache.flink.core.fs.FileSystemFactory which contains the class name of “ FileSystemFactoryEnhance.class”
> 
> And  the job mainclass is :
>    “ public static void main(String[] args) throws Exception{
>     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>     env.enableCheckpointing(60*1000);
>     env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>     env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>     env.getConfig().enableSysoutLogging();
> 
> 
>     Properties props = new Properties();
>     props.put("bootstrap.servers", SERVERS);
>     props.put("group.id <http://group.id/>", GROUPID);
>     props.put("enable.auto.commit", "true");
>     // props.put("auto.commit.interval.ms <http://auto.commit.interval.ms/>", "1000");
>     props.put("session.timeout.ms <http://session.timeout.ms/>", "30000");
>     props.put("auto.offset.reset", "latest");
>     props.put("key.deserializer", org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StringDeserializer.class.getName());
>     props.put("value.deserializer", StringDeserializer.class.getName());
>     FlinkKafkaConsumer010 consumer011 = new FlinkKafkaConsumer010<String>("zyf_test_2", new SimpleStringSchema(), props);
>     DataStream<String> source = env.addSource(consumer011).setParallelism(1);
> 
>     source.print();
>     Thread.currentThread().getContextClassLoader();
> 
>     StreamingFileSink sink = StreamingFileSink
>             .forRowFormat(new Path("hdfs://bdms-test/user/sloth/zyf"), new SimpleStringEncoder<>("UTF-8"))
>             .build();
> 
>     source.addSink(sink);
> 
>     env.execute();
> }”
> 
> And start the job, the jobmanager filesystem is error, the log means the jobmananger use “FileSystemFactoryEnhance” filesystem and confict.
> 
> As the url https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/#pluggable-file-systems <https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/#pluggable-file-systems> how to avoid use “Thread.currentThread().getContextClassLoader()"
> 
> 
> 	
> ouywl
> ouywl@139.com
>  <https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1&name=ouywl&uid=ouywl%40139.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fsma8dc7719018ba2517da7111b3db5a170.jpg&items=%5B%22ouywl%40139.com%22%5D>


Re: [Question] How to use different filesystem between checkpoint data and user data sink

Posted by Piotr Nowojski <pi...@ververica.com>.
Hi,

As Yang Wang pointed out, you should use the new plugins mechanism.

If it doesn’t work, first make sure that you are shipping/distributing the plugins jars correctly - the correct plugins directory structure both on the client machine. Next make sure that the cluster has the same correct setup. This is especially true for the standalone/cluster execution modes. For yarn, mesos, docker the plugins dir should be shipped to the cluster by Flink itself, however Plugins support in yarn is currently semi broken [1]. This is already fixed, but waiting to be released in 1.9.2 and 1.10.

If it still doesn’t work, look for TaskManager logs what plugins/file systems are being loaded during the startup. If none, that's the problem.

Piotrek

[1] https://issues.apache.org/jira/browse/FLINK-14382 <https://issues.apache.org/jira/browse/FLINK-14382>

> On 18 Dec 2019, at 12:40, Yang Wang <da...@gmail.com> wrote:
> 
> You could have a try the new plugin mechanism.
> Create a new directory named "myhdfs" under $FLINK_HOME/plugins, and then put your filesystem related jars in it.
> Different plugins will be loaded by separate classloader to avoid conflict.
> 
> 
> Best,
> Yang
> 
> vino yang <yanghua1127@gmail.com <ma...@gmail.com>> 于2019年12月18日周三 下午6:46写道:
> Hi ouywl,
> 
> >>    Thread.currentThread().getContextClassLoader();
> What does this statement mean in your program?
> 
> In addition, can you share your implementation of the customized file system plugin and the related exception?
> 
> Best,
> Vino
> 
> ouywl <ouywl@139.com <ma...@139.com>> 于2019年12月18日周三 下午4:59写道:
> Hi all,
>     We have implemented a filesystem plugin for sink data to hdfs1, and the yarn for flink running is used hdfs2. So when the job running, the jobmanager use the conf of hdfs1 to create filesystem, the filesystem plugin  is conflict with flink component. 
>     We implemeted step:
>       1.  ‘FileSystemEnhance’ is implement from “FileSystem”
>       2.  ‘FileSystemFactoryEnhance’ is implement from “FileSystemFactory”,add kerberos auth in ”FileSystemFactoryEnhance" 
>       3. Add a service entry. Create a file META-INF/services/org.apache.flink.core.fs.FileSystemFactory which contains the class name of “ FileSystemFactoryEnhance.class”
> 
> And  the job mainclass is :
>    “ public static void main(String[] args) throws Exception{
>     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>     env.enableCheckpointing(60*1000);
>     env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>     env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>     env.getConfig().enableSysoutLogging();
> 
> 
>     Properties props = new Properties();
>     props.put("bootstrap.servers", SERVERS);
>     props.put("group.id <http://group.id/>", GROUPID);
>     props.put("enable.auto.commit", "true");
>     // props.put("auto.commit.interval.ms <http://auto.commit.interval.ms/>", "1000");
>     props.put("session.timeout.ms <http://session.timeout.ms/>", "30000");
>     props.put("auto.offset.reset", "latest");
>     props.put("key.deserializer", org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StringDeserializer.class.getName());
>     props.put("value.deserializer", StringDeserializer.class.getName());
>     FlinkKafkaConsumer010 consumer011 = new FlinkKafkaConsumer010<String>("zyf_test_2", new SimpleStringSchema(), props);
>     DataStream<String> source = env.addSource(consumer011).setParallelism(1);
> 
>     source.print();
>     Thread.currentThread().getContextClassLoader();
> 
>     StreamingFileSink sink = StreamingFileSink
>             .forRowFormat(new Path("hdfs://bdms-test/user/sloth/zyf"), new SimpleStringEncoder<>("UTF-8"))
>             .build();
> 
>     source.addSink(sink);
> 
>     env.execute();
> }”
> 
> And start the job, the jobmanager filesystem is error, the log means the jobmananger use “FileSystemFactoryEnhance” filesystem and confict.
> 
> As the url https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/#pluggable-file-systems <https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/#pluggable-file-systems> how to avoid use “Thread.currentThread().getContextClassLoader()"
> 
> 
> 	
> ouywl
> ouywl@139.com
>  <https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1&name=ouywl&uid=ouywl%40139.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fsma8dc7719018ba2517da7111b3db5a170.jpg&items=%5B%22ouywl%40139.com%22%5D>


Re: [Question] How to use different filesystem between checkpoint data and user data sink

Posted by Yang Wang <da...@gmail.com>.
You could have a try the new plugin mechanism.
Create a new directory named "myhdfs" under $FLINK_HOME/plugins, and then
put your filesystem related jars in it.
Different plugins will be loaded by separate classloader to avoid conflict.


Best,
Yang

vino yang <ya...@gmail.com> 于2019年12月18日周三 下午6:46写道:

> Hi ouywl,
>
> *>>    Thread.currentThread().getContextClassLoader();*
>
> What does this statement mean in your program?
>
> In addition, can you share your implementation of the customized file
> system plugin and the related exception?
>
> Best,
> Vino
>
> ouywl <ou...@139.com> 于2019年12月18日周三 下午4:59写道:
>
>> Hi all,
>>     We have implemented a filesystem plugin for sink data to hdfs1, and
>> the yarn for flink running is used hdfs2. So when the job running, the
>> jobmanager use the conf of hdfs1 to create filesystem, the filesystem
>> plugin  is conflict with flink component.
>>     We implemeted step:
>>       1.  ‘FileSystemEnhance’ is implement from “FileSystem”
>>       2.  ‘FileSystemFactoryEnhance’ is implement from “FileSystemFactory”,add
>> kerberos auth in ”FileSystemFactoryEnhance"
>>       3. Add a service entry. Create a file
>> META-INF/services/org.apache.flink.core.fs.FileSystemFactory which
>> contains the class name of “ FileSystemFactoryEnhance.class”
>>
>> And  the job mainclass is :
>>    “ *public static void main(String[] args) throws Exception{*
>>
>> *    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();*
>>
>>
>>
>>
>>
>>
>>
>>
>> *    env.enableCheckpointing(60*1000);    env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);    env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);    env.getConfig().enableSysoutLogging();    Properties props = new Properties();    props.put("bootstrap.servers", SERVERS);    props.put("group.id <http://group.id>", GROUPID);    props.put("enable.auto.commit", "true");    // props.put("auto.commit.interval.ms <http://auto.commit.interval.ms>", "1000");    props.put("session.timeout.ms <http://session.timeout.ms>", "30000");    props.put("auto.offset.reset", "latest");    props.put("key.deserializer", org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StringDeserializer.class.getName());    props.put("value.deserializer", StringDeserializer.class.getName());    FlinkKafkaConsumer010 consumer011 = new FlinkKafkaConsumer010<String>("zyf_test_2", new SimpleStringSchema(), props);    DataStream<String> source = env.addSource(consumer011).setParallelism(1);    source.print();    Thread.currentThread().getContextClassLoader();    StreamingFileSink sink = StreamingFileSink            .forRowFormat(new Path("hdfs://bdms-test/user/sloth/zyf"), new SimpleStringEncoder<>("UTF-8"))            .build();    source.addSink(sink);    env.execute();}”And start the job, the jobmanager filesystem is error, the log means the jobmananger use “FileSystemFactoryEnhance” filesystem and confict.As the url https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/#pluggable-file-systems <https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/#pluggable-file-systems> how to avoid use “Thread.currentThread().getContextClassLoader()"*
>>
>>
>> ouywl
>> ouywl@139.com
>>
>> <https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1&name=ouywl&uid=ouywl%40139.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fsma8dc7719018ba2517da7111b3db5a170.jpg&items=%5B%22ouywl%40139.com%22%5D>
>>
>>

Re: [Question] How to use different filesystem between checkpoint data and user data sink

Posted by Yang Wang <da...@gmail.com>.
You could have a try the new plugin mechanism.
Create a new directory named "myhdfs" under $FLINK_HOME/plugins, and then
put your filesystem related jars in it.
Different plugins will be loaded by separate classloader to avoid conflict.


Best,
Yang

vino yang <ya...@gmail.com> 于2019年12月18日周三 下午6:46写道:

> Hi ouywl,
>
> *>>    Thread.currentThread().getContextClassLoader();*
>
> What does this statement mean in your program?
>
> In addition, can you share your implementation of the customized file
> system plugin and the related exception?
>
> Best,
> Vino
>
> ouywl <ou...@139.com> 于2019年12月18日周三 下午4:59写道:
>
>> Hi all,
>>     We have implemented a filesystem plugin for sink data to hdfs1, and
>> the yarn for flink running is used hdfs2. So when the job running, the
>> jobmanager use the conf of hdfs1 to create filesystem, the filesystem
>> plugin  is conflict with flink component.
>>     We implemeted step:
>>       1.  ‘FileSystemEnhance’ is implement from “FileSystem”
>>       2.  ‘FileSystemFactoryEnhance’ is implement from “FileSystemFactory”,add
>> kerberos auth in ”FileSystemFactoryEnhance"
>>       3. Add a service entry. Create a file
>> META-INF/services/org.apache.flink.core.fs.FileSystemFactory which
>> contains the class name of “ FileSystemFactoryEnhance.class”
>>
>> And  the job mainclass is :
>>    “ *public static void main(String[] args) throws Exception{*
>>
>> *    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();*
>>
>>
>>
>>
>>
>>
>>
>>
>> *    env.enableCheckpointing(60*1000);    env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);    env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);    env.getConfig().enableSysoutLogging();    Properties props = new Properties();    props.put("bootstrap.servers", SERVERS);    props.put("group.id <http://group.id>", GROUPID);    props.put("enable.auto.commit", "true");    // props.put("auto.commit.interval.ms <http://auto.commit.interval.ms>", "1000");    props.put("session.timeout.ms <http://session.timeout.ms>", "30000");    props.put("auto.offset.reset", "latest");    props.put("key.deserializer", org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StringDeserializer.class.getName());    props.put("value.deserializer", StringDeserializer.class.getName());    FlinkKafkaConsumer010 consumer011 = new FlinkKafkaConsumer010<String>("zyf_test_2", new SimpleStringSchema(), props);    DataStream<String> source = env.addSource(consumer011).setParallelism(1);    source.print();    Thread.currentThread().getContextClassLoader();    StreamingFileSink sink = StreamingFileSink            .forRowFormat(new Path("hdfs://bdms-test/user/sloth/zyf"), new SimpleStringEncoder<>("UTF-8"))            .build();    source.addSink(sink);    env.execute();}”And start the job, the jobmanager filesystem is error, the log means the jobmananger use “FileSystemFactoryEnhance” filesystem and confict.As the url https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/#pluggable-file-systems <https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/#pluggable-file-systems> how to avoid use “Thread.currentThread().getContextClassLoader()"*
>>
>>
>> ouywl
>> ouywl@139.com
>>
>> <https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1&name=ouywl&uid=ouywl%40139.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fsma8dc7719018ba2517da7111b3db5a170.jpg&items=%5B%22ouywl%40139.com%22%5D>
>>
>>