You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bigtop.apache.org by gu...@apache.org on 2022/09/09 10:30:45 UTC

[bigtop] branch master updated: BIGTOP-3741:Add Flink support for Bigtop 3.1.0 Mpack (#993)

This is an automated email from the ASF dual-hosted git repository.

guyuqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bigtop.git


The following commit(s) were added to refs/heads/master by this push:
     new 30ef6dfb BIGTOP-3741:Add Flink support for Bigtop 3.1.0 Mpack (#993)
30ef6dfb is described below

commit 30ef6dfbfcd63daf4a9f2b6e39e0a46b5306ed2e
Author: yaolei <le...@163.com>
AuthorDate: Fri Sep 9 18:30:38 2022 +0800

    BIGTOP-3741:Add Flink support for Bigtop 3.1.0 Mpack (#993)
    
    BIGTOP-3741:Add Flink support for Bigtop 3.1.0 Mpack
    * improve role_command_order.json
    * add log4j configuration
    * improve flink service check
---
 .../stacks/BGTP/1.0/services/FLINK/alerts.json     |  32 ++
 .../services/FLINK/configuration/flink-conf.xml    | 397 +++++++++++++++++++++
 .../1.0/services/FLINK/configuration/flink-env.xml |  75 ++++
 .../configuration/flink-log4j-cli-properties.xml   | 100 ++++++
 .../flink-log4j-console-properties.xml             | 101 ++++++
 .../FLINK/configuration/flink-log4j-properties.xml |  90 +++++
 .../flink-log4j-session-properties.xml             |  75 ++++
 .../stacks/BGTP/1.0/services/FLINK/kerberos.json   |  50 +++
 .../stacks/BGTP/1.0/services/FLINK/metainfo.xml    | 144 ++++++++
 .../services/FLINK/package/scripts/flink_client.py |  47 +++
 .../FLINK/package/scripts/flink_history_server.py  |  88 +++++
 .../FLINK/package/scripts/flink_service.py         |  77 ++++
 .../1.0/services/FLINK/package/scripts/params.py   | 115 ++++++
 .../FLINK/package/scripts/service_check.py         |  46 +++
 .../services/FLINK/package/scripts/setup_flink.py  |  94 +++++
 .../FLINK/package/scripts/status_params.py         |  33 ++
 .../1.0/services/FLINK/quicklinks/quicklinks.json  |  28 ++
 .../1.0/services/FLINK/role_command_order.json     |   7 +
 18 files changed, 1599 insertions(+)

diff --git a/bigtop-packages/src/common/bigtop-ambari-mpack/bgtp-ambari-mpack/src/main/resources/stacks/BGTP/1.0/services/FLINK/alerts.json b/bigtop-packages/src/common/bigtop-ambari-mpack/bgtp-ambari-mpack/src/main/resources/stacks/BGTP/1.0/services/FLINK/alerts.json
new file mode 100644
index 00000000..7f6fb6b5
--- /dev/null
+++ b/bigtop-packages/src/common/bigtop-ambari-mpack/bgtp-ambari-mpack/src/main/resources/stacks/BGTP/1.0/services/FLINK/alerts.json
@@ -0,0 +1,32 @@
+{
+  "FLINK": {
+    "service": [],
+    "FLINK_HISTORYSERVER": [
+      {
+        "name": "FLINK_HISTORYSERVER_PROCESS",
+        "label": "Flink History Server",
+        "description": "This host-level alert is triggered if the Flink History Server cannot be determined to be up.",
+        "interval": 1,
+        "scope": "ANY",
+        "source": {
+          "type": "PORT",
+          "uri": "{{flink-conf/historyserver.web.port}}",
+          "default_port": 8082,
+          "reporting": {
+            "ok": {
+              "text": "TCP OK - {0:.3f}s response on port {1}"
+            },
+            "warning": {
+              "text": "TCP OK - {0:.3f}s response on port {1}",
+              "value": 1.5
+            },
+            "critical": {
+              "text": "Connection failed: {0} to {1}:{2}",
+              "value": 5
+            }
+          }
+        }
+      }
+    ]
+  }
+}
diff --git a/bigtop-packages/src/common/bigtop-ambari-mpack/bgtp-ambari-mpack/src/main/resources/stacks/BGTP/1.0/services/FLINK/configuration/flink-conf.xml b/bigtop-packages/src/common/bigtop-ambari-mpack/bgtp-ambari-mpack/src/main/resources/stacks/BGTP/1.0/services/FLINK/configuration/flink-conf.xml
new file mode 100755
index 00000000..5a85b410
--- /dev/null
+++ b/bigtop-packages/src/common/bigtop-ambari-mpack/bgtp-ambari-mpack/src/main/resources/stacks/BGTP/1.0/services/FLINK/configuration/flink-conf.xml
@@ -0,0 +1,397 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-->
+<configuration supports_adding_forbidden="true">
+  <property>
+    <name>jobmanager.archive.fs.dir</name>
+    <value>hdfs:///completed-jobs/</value>
+    <description>Directory for JobManager to store the archives of completed jobs.</description>
+    <on-ambari-upgrade add="true" />
+  </property>
+  <property>
+    <name>historyserver.archive.fs.dir</name>
+    <value>hdfs:///completed-jobs/</value>
+    <description>Comma separated list of directories to fetch archived jobs from.</description>
+    <on-ambari-upgrade add="true" />
+  </property>
+  <property>
+    <name>historyserver.web.port</name>
+    <value>8082</value>
+    <description>The port under which the web-based HistoryServer listens.</description>
+    <on-ambari-upgrade add="true" />
+  </property>
+  <property>
+    <name>historyserver.archive.fs.refresh-interval</name>
+    <value>10000</value>
+    <description>Interval in milliseconds for refreshing the monitored directories.</description>
+    <on-ambari-upgrade add="true" />
+  </property>
+  <property>
+    <name>security.kerberos.login.keytab</name>
+    <description>Flink keytab path</description>
+    <value>none</value>
+    <on-ambari-upgrade add="true"/>
+  </property>
+  <property>
+    <name>security.kerberos.login.principal</name>
+    <description>Flink principal name</description>
+    <property-type>KERBEROS_PRINCIPAL</property-type>
+    <value>none</value>
+    <on-ambari-upgrade add="true"/>
+  </property>
+  <!-- flink-conf.yaml -->
+  <property>
+    <name>content</name>
+    <display-name>flink-conf template</display-name>
+    <description>This is the jinja template for flink-conf.xml file</description>
+    <value>
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+
+#==============================================================================
+# Common
+#==============================================================================
+
+# The external address of the host on which the JobManager runs and can be
+# reached by the TaskManagers and any clients which want to connect. This setting
+# is only used in Standalone mode and may be overwritten on the JobManager side
+# by specifying the --host hostname parameter of the bin/jobmanager.sh executable.
+# In high availability mode, if you use the bin/start-cluster.sh script and setup
+# the conf/masters file, this will be taken care of automatically. Yarn
+# automatically configure the host name based on the hostname of the node where the
+# JobManager runs.
+
+jobmanager.rpc.address: localhost
+
+# The RPC port where the JobManager is reachable.
+
+jobmanager.rpc.port: 6123
+
+# The host interface the JobManager will bind to. My default, this is localhost, and will prevent
+# the JobManager from communicating outside the machine/container it is running on.
+# On YARN this setting will be ignored if it is set to 'localhost', defaulting to 0.0.0.0.
+# On Kubernetes this setting will be ignored, defaulting to 0.0.0.0.
+#
+# To enable this, set the bind-host address to one that has access to an outside facing network
+# interface, such as 0.0.0.0.
+
+jobmanager.bind-host: localhost
+
+
+# The total process memory size for the JobManager.
+#
+# Note this accounts for all memory usage within the JobManager process, including JVM metaspace and other overhead.
+
+jobmanager.memory.process.size: 1600m
+
+# The host interface the TaskManager will bind to. By default, this is localhost, and will prevent
+# the TaskManager from communicating outside the machine/container it is running on.
+# On YARN this setting will be ignored if it is set to 'localhost', defaulting to 0.0.0.0.
+# On Kubernetes this setting will be ignored, defaulting to 0.0.0.0.
+#
+# To enable this, set the bind-host address to one that has access to an outside facing network
+# interface, such as 0.0.0.0.
+
+taskmanager.bind-host: localhost
+
+# The address of the host on which the TaskManager runs and can be reached by the JobManager and
+# other TaskManagers. If not specified, the TaskManager will try different strategies to identify
+# the address.
+#
+# Note this address needs to be reachable by the JobManager and forward traffic to one of
+# the interfaces the TaskManager is bound to (see 'taskmanager.bind-host').
+#
+# Note also that unless all TaskManagers are running on the same machine, this address needs to be
+# configured separately for each TaskManager.
+
+taskmanager.host: localhost
+
+# The total process memory size for the TaskManager.
+#
+# Note this accounts for all memory usage within the TaskManager process, including JVM metaspace and other overhead.
+
+taskmanager.memory.process.size: 1728m
+
+# To exclude JVM metaspace and overhead, please, use total Flink memory size instead of 'taskmanager.memory.process.size'.
+# It is not recommended to set both 'taskmanager.memory.process.size' and Flink memory.
+#
+# taskmanager.memory.flink.size: 1280m
+
+# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.
+
+taskmanager.numberOfTaskSlots: 1
+
+# The parallelism used for programs that did not specify and other parallelism.
+
+parallelism.default: 1
+
+# The default file system scheme and authority.
+#
+# By default file paths without scheme are interpreted relative to the local
+# root file system 'file:///'. Use this to override the default and interpret
+# relative paths relative to a different file system,
+# for example 'hdfs://mynamenode:12345'
+#
+# fs.default-scheme
+
+#==============================================================================
+# JVM and Logging Options
+#==============================================================================
+#Java runtime to use
+env.java.home: {{java_home}}
+
+#Path to hadoop configuration directory. It is required to read HDFS and/or YARN configuration.
+#You can also set it via environment variable.
+env.hadoop.conf.dir: {{hadoop_conf_dir}}
+
+#Defines the directory where the flink-&lt;host&gt;-&lt;process&gt;.pid files are saved.
+env.pid.dir: {{flink_pid_dir}}
+
+#==============================================================================
+# High Availability
+#==============================================================================
+
+# The high-availability mode. Possible options are 'NONE' or 'zookeeper'.
+#
+# high-availability: zookeeper
+
+# The path where metadata for master recovery is persisted. While ZooKeeper stores
+# the small ground truth for checkpoint and leader election, this location stores
+# the larger objects, like persisted dataflow graphs.
+#
+# Must be a durable file system that is accessible from all nodes
+# (like HDFS, S3, Ceph, nfs, ...)
+#
+# high-availability.storageDir: hdfs:///flink/ha/
+
+# The list of ZooKeeper quorum peers that coordinate the high-availability
+# setup. This must be a list of the form:
+# "host1:clientPort,host2:clientPort,..." (default clientPort: 2181)
+#
+# high-availability.zookeeper.quorum: localhost:2181
+
+
+# ACL options are based on https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html#sc_BuiltinACLSchemes
+# It can be either "creator" (ZOO_CREATE_ALL_ACL) or "open" (ZOO_OPEN_ACL_UNSAFE)
+# The default value is "open" and it can be changed to "creator" if ZK security is enabled
+#
+# high-availability.zookeeper.client.acl: open
+
+#==============================================================================
+# Fault tolerance and checkpointing
+#==============================================================================
+
+# The backend that will be used to store operator state checkpoints if
+# checkpointing is enabled. Checkpointing is enabled when execution.checkpointing.interval > 0.
+#
+# Execution checkpointing related parameters. Please refer to CheckpointConfig and ExecutionCheckpointingOptions for more details.
+#
+# execution.checkpointing.interval: 3min
+# execution.checkpointing.externalized-checkpoint-retention: [DELETE_ON_CANCELLATION, RETAIN_ON_CANCELLATION]
+# execution.checkpointing.max-concurrent-checkpoints: 1
+# execution.checkpointing.min-pause: 0
+# execution.checkpointing.mode: [EXACTLY_ONCE, AT_LEAST_ONCE]
+# execution.checkpointing.timeout: 10min
+# execution.checkpointing.tolerable-failed-checkpoints: 0
+# execution.checkpointing.unaligned: false
+#
+# Supported backends are 'hashmap', 'rocksdb', or the
+# &lt;class-name-of-factory&gt;.
+#
+# state.backend: hashmap
+
+# Directory for checkpoints filesystem, when using any of the default bundled
+# state backends.
+#
+# state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints
+
+# Default target directory for savepoints, optional.
+#
+# state.savepoints.dir: hdfs://namenode-host:port/flink-savepoints
+
+# Flag to enable/disable incremental checkpoints for backends that
+# support incremental checkpoints (like the RocksDB state backend).
+#
+# state.backend.incremental: false
+
+# The failover strategy, i.e., how the job computation recovers from task failures.
+# Only restart tasks that may have been affected by the task failure, which typically includes
+# downstream tasks and potentially upstream tasks if their produced data is no longer available for consumption.
+
+jobmanager.execution.failover-strategy: region
+
+#==============================================================================
+# REST &amp; web frontend
+#==============================================================================
+
+# The port to which the REST client connects to. If rest.bind-port has
+# not been specified, then the server will bind to this port as well.
+#
+#rest.port: 8081
+
+# The address to which the REST client will connect to
+#
+rest.address: localhost
+
+# Port range for the REST and web server to bind to.
+#
+#rest.bind-port: 8080-8090
+
+# The address that the REST &amp; web server binds to
+# By default, this is localhost, which prevents the REST &amp; web server from
+# being able to communicate outside of the machine/container it is running on.
+#
+# To enable this, set the bind address to one that has access to outside-facing
+# network interface, such as 0.0.0.0.
+#
+rest.bind-address: localhost
+
+# Flag to specify whether job submission is enabled from the web-based
+# runtime monitor. Uncomment to disable.
+
+#web.submit.enable: false
+
+# Flag to specify whether job cancellation is enabled from the web-based
+# runtime monitor. Uncomment to disable.
+
+#web.cancel.enable: false
+
+#==============================================================================
+# Advanced
+#==============================================================================
+
+# Override the directories for temporary files. If not specified, the
+# system-specific Java temporary directory (java.io.tmpdir property) is taken.
+#
+# For framework setups on Yarn, Flink will automatically pick up the
+# containers' temp directories without any need for configuration.
+#
+# Add a delimited list for multiple directories, using the system directory
+# delimiter (colon ':' on unix) or a comma, e.g.:
+#     /data1/tmp:/data2/tmp:/data3/tmp
+#
+# Note: Each directory entry is read from and written to by a different I/O
+# thread. You can include the same directory multiple times in order to create
+# multiple I/O threads against that directory. This is for example relevant for
+# high-throughput RAIDs.
+#
+# io.tmp.dirs: /tmp
+
+# The classloading resolve order. Possible values are 'child-first' (Flink's default)
+# and 'parent-first' (Java's default).
+#
+# Child first classloading allows users to use different dependency/library
+# versions in their application than those in the classpath. Switching back
+# to 'parent-first' may help with debugging dependency issues.
+#
+# classloader.resolve-order: child-first
+
+# The amount of memory going to the network stack. These numbers usually need
+# no tuning. Adjusting them may be necessary in case of an "Insufficient number
+# of network buffers" error. The default min is 64MB, the default max is 1GB.
+#
+# taskmanager.memory.network.fraction: 0.1
+# taskmanager.memory.network.min: 64mb
+# taskmanager.memory.network.max: 1gb
+
+#==============================================================================
+# Flink Cluster Security Configuration
+#==============================================================================
+
+# Kerberos authentication for various components - Hadoop, ZooKeeper, and connectors -
+# may be enabled in four steps:
+# 1. configure the local krb5.conf file
+# 2. provide Kerberos credentials (either a keytab or a ticket cache w/ kinit)
+# 3. make the credentials available to various JAAS login contexts
+# 4. configure the connector to use JAAS/SASL
+
+# The below configure how Kerberos credentials are provided. A keytab will be used instead of
+# a ticket cache if the keytab path and principal are set.
+
+{% if security_enabled %}
+security.kerberos.login.use-ticket-cache: true
+security.kerberos.login.keytab: {{security_kerberos_login_keytab}}
+security.kerberos.login.principal: {{security_kerberos_login_principal}}
+{% else %}
+# security.kerberos.login.use-ticket-cache: true
+# security.kerberos.login.keytab: /path/to/kerberos/keytab
+# security.kerberos.login.principal: flink-user
+{% endif %}
+# The configuration below defines which JAAS login contexts
+
+# security.kerberos.login.contexts: Client,KafkaClient
+
+#==============================================================================
+# ZK Security Configuration
+#==============================================================================
+
+# Below configurations are applicable if ZK ensemble is configured for security
+
+# Override below configuration to provide custom ZK service name if configured
+# zookeeper.sasl.service-name: zookeeper
+
+# The configuration below must match one of the values set in "security.kerberos.login.contexts"
+# zookeeper.sasl.login-context-name: Client
+
+#==============================================================================
+# HistoryServer
+#==============================================================================
+
+# The HistoryServer is started and stopped via bin/historyserver.sh (start|stop)
+
+# Directory to upload completed jobs to. Add this directory to the list of
+# monitored directories of the HistoryServer as well (see below).
+jobmanager.archive.fs.dir: {{jobmanager_archive_fs_dir}}
+
+# The address under which the web-based HistoryServer listens.
+#historyserver.web.address: 0.0.0.0
+
+# The port under which the web-based HistoryServer listens.
+historyserver.web.port: {{historyserver_web_port}}
+
+# Comma separated list of directories to monitor for completed jobs.
+historyserver.archive.fs.dir: {{historyserver_archive_fs_dir}}
+
+# Interval in milliseconds for refreshing the monitored directories.
+historyserver.archive.fs.refresh-interval: {{historyserver_archive_fs_refresh_interval}}
+    </value>
+    <value-attributes>
+      <type>content</type>
+    </value-attributes>
+    <on-ambari-upgrade add="true"/>
+  </property>
+</configuration>
diff --git a/bigtop-packages/src/common/bigtop-ambari-mpack/bgtp-ambari-mpack/src/main/resources/stacks/BGTP/1.0/services/FLINK/configuration/flink-env.xml b/bigtop-packages/src/common/bigtop-ambari-mpack/bgtp-ambari-mpack/src/main/resources/stacks/BGTP/1.0/services/FLINK/configuration/flink-env.xml
new file mode 100755
index 00000000..dc2cc63f
--- /dev/null
+++ b/bigtop-packages/src/common/bigtop-ambari-mpack/bgtp-ambari-mpack/src/main/resources/stacks/BGTP/1.0/services/FLINK/configuration/flink-env.xml
@@ -0,0 +1,75 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-->
+<configuration supports_adding_forbidden="true">
+  <property>
+    <name>flink_user</name>
+    <display-name>Flink User</display-name>
+    <value>flink</value>
+    <property-type>USER</property-type>
+    <description/>
+    <value-attributes>
+      <type>user</type>
+      <overridable>false</overridable>
+      <user-groups>
+        <property>
+          <type>cluster-env</type>
+          <name>user_group</name>
+        </property>
+        <property>
+          <type>flink-env</type>
+          <name>flink_group</name>
+        </property>
+      </user-groups>
+    </value-attributes>
+  </property>
+  <property>
+    <name>flink_group</name>
+    <display-name>Flink Group</display-name>
+    <value>flink</value>
+    <property-type>GROUP</property-type>
+    <description>flink group</description>
+    <value-attributes>
+      <type>user</type>
+    </value-attributes>
+    <on-ambari-upgrade add="true"/>
+  </property>
+  <property>
+    <name>flink_log_dir</name>
+    <value>/var/log/flink</value>
+    <display-name>Flink Log Dir Prefix</display-name>
+    <description>Log Directories for Flink.</description>
+    <value-attributes>
+      <type>directory</type>
+      <overridable>false</overridable>
+    </value-attributes>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>flink_pid_dir</name>
+    <display-name>Flink PID directory</display-name>
+    <value>/var/run/flink</value>
+    <value-attributes>
+      <type>directory</type>
+    </value-attributes>
+    <on-ambari-upgrade add="true"/>
+  </property>
+</configuration>
diff --git a/bigtop-packages/src/common/bigtop-ambari-mpack/bgtp-ambari-mpack/src/main/resources/stacks/BGTP/1.0/services/FLINK/configuration/flink-log4j-cli-properties.xml b/bigtop-packages/src/common/bigtop-ambari-mpack/bgtp-ambari-mpack/src/main/resources/stacks/BGTP/1.0/services/FLINK/configuration/flink-log4j-cli-properties.xml
new file mode 100755
index 00000000..cc1bd902
--- /dev/null
+++ b/bigtop-packages/src/common/bigtop-ambari-mpack/bgtp-ambari-mpack/src/main/resources/stacks/BGTP/1.0/services/FLINK/configuration/flink-log4j-cli-properties.xml
@@ -0,0 +1,100 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-->
+<configuration supports_final="false" supports_adding_forbidden="true">
+  <property>
+    <name>content</name>
+    <description>Flink-log4j-cli-Properties</description>
+    <value>
+#########################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Allows this configuration to be modified at runtime. The file will be checked every 30 seconds.
+monitorInterval=30
+
+rootLogger.level = INFO
+rootLogger.appenderRef.file.ref = FileAppender
+
+# Log all infos in the given file
+appender.file.name = FileAppender
+appender.file.type = FILE
+appender.file.append = false
+appender.file.fileName = ${sys:log.file}
+appender.file.layout.type = PatternLayout
+appender.file.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
+
+# Log output from org.apache.flink.yarn to the console. This is used by the
+# CliFrontend class when using a per-job YARN cluster.
+logger.yarn.name = org.apache.flink.yarn
+logger.yarn.level = INFO
+logger.yarn.appenderRef.console.ref = ConsoleAppender
+logger.yarncli.name = org.apache.flink.yarn.cli.FlinkYarnSessionCli
+logger.yarncli.level = INFO
+logger.yarncli.appenderRef.console.ref = ConsoleAppender
+logger.hadoop.name = org.apache.hadoop
+logger.hadoop.level = INFO
+logger.hadoop.appenderRef.console.ref = ConsoleAppender
+
+# Make sure hive logs go to the file.
+logger.hive.name = org.apache.hadoop.hive
+logger.hive.level = INFO
+logger.hive.additivity = false
+logger.hive.appenderRef.file.ref = FileAppender
+
+# Log output from org.apache.flink.kubernetes to the console.
+logger.kubernetes.name = org.apache.flink.kubernetes
+logger.kubernetes.level = INFO
+logger.kubernetes.appenderRef.console.ref = ConsoleAppender
+
+appender.console.name = ConsoleAppender
+appender.console.type = CONSOLE
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
+
+# suppress the warning that hadoop native libraries are not loaded (irrelevant for the client)
+logger.hadoopnative.name = org.apache.hadoop.util.NativeCodeLoader
+logger.hadoopnative.level = OFF
+
+# Suppress the irrelevant (wrong) warnings from the Netty channel handler
+logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline
+logger.netty.level = OFF
+    </value>
+    <value-attributes>
+      <type>content</type>
+      <show-property-name>false</show-property-name>
+    </value-attributes>
+    <on-ambari-upgrade add="true"/>
+  </property>
+</configuration>
diff --git a/bigtop-packages/src/common/bigtop-ambari-mpack/bgtp-ambari-mpack/src/main/resources/stacks/BGTP/1.0/services/FLINK/configuration/flink-log4j-console-properties.xml b/bigtop-packages/src/common/bigtop-ambari-mpack/bgtp-ambari-mpack/src/main/resources/stacks/BGTP/1.0/services/FLINK/configuration/flink-log4j-console-properties.xml
new file mode 100755
index 00000000..6f724c4f
--- /dev/null
+++ b/bigtop-packages/src/common/bigtop-ambari-mpack/bgtp-ambari-mpack/src/main/resources/stacks/BGTP/1.0/services/FLINK/configuration/flink-log4j-console-properties.xml
@@ -0,0 +1,101 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-->
+<configuration supports_final="false" supports_adding_forbidden="true">
+  <property>
+    <name>content</name>
+    <description>Flink-log4j-console-Properties</description>
+    <value>
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Allows this configuration to be modified at runtime. The file will be checked every 30 seconds.
+monitorInterval=30
+
+# This affects logging for both user code and Flink
+rootLogger.level = INFO
+rootLogger.appenderRef.console.ref = ConsoleAppender
+rootLogger.appenderRef.rolling.ref = RollingFileAppender
+
+# Uncomment this if you want to _only_ change Flink's logging
+#logger.flink.name = org.apache.flink
+#logger.flink.level = INFO
+
+# The following lines keep the log level of common libraries/connectors on
+# log level INFO. The root logger does not override this. You have to manually
+# change the log levels here.
+logger.akka.name = akka
+logger.akka.level = INFO
+logger.kafka.name= org.apache.kafka
+logger.kafka.level = INFO
+logger.hadoop.name = org.apache.hadoop
+logger.hadoop.level = INFO
+logger.zookeeper.name = org.apache.zookeeper
+logger.zookeeper.level = INFO
+logger.shaded_zookeeper.name = org.apache.flink.shaded.zookeeper3
+logger.shaded_zookeeper.level = INFO
+
+# Log all infos to the console
+appender.console.name = ConsoleAppender
+appender.console.type = CONSOLE
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
+
+# Log all infos in the given rolling file
+appender.rolling.name = RollingFileAppender
+appender.rolling.type = RollingFile
+appender.rolling.append = true
+appender.rolling.fileName = ${sys:log.file}
+appender.rolling.filePattern = ${sys:log.file}.%i
+appender.rolling.layout.type = PatternLayout
+appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
+appender.rolling.policies.type = Policies
+appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
+appender.rolling.policies.size.size=100MB
+appender.rolling.policies.startup.type = OnStartupTriggeringPolicy
+appender.rolling.strategy.type = DefaultRolloverStrategy
+appender.rolling.strategy.max = ${env:MAX_LOG_FILE_NUMBER:-10}
+
+# Suppress the irrelevant (wrong) warnings from the Netty channel handler
+logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline
+logger.netty.level = OFF
+    </value>
+    <value-attributes>
+      <type>content</type>
+      <show-property-name>false</show-property-name>
+    </value-attributes>
+    <on-ambari-upgrade add="true"/>
+  </property>
+</configuration>
diff --git a/bigtop-packages/src/common/bigtop-ambari-mpack/bgtp-ambari-mpack/src/main/resources/stacks/BGTP/1.0/services/FLINK/configuration/flink-log4j-properties.xml b/bigtop-packages/src/common/bigtop-ambari-mpack/bgtp-ambari-mpack/src/main/resources/stacks/BGTP/1.0/services/FLINK/configuration/flink-log4j-properties.xml
new file mode 100755
index 00000000..7f98d190
--- /dev/null
+++ b/bigtop-packages/src/common/bigtop-ambari-mpack/bgtp-ambari-mpack/src/main/resources/stacks/BGTP/1.0/services/FLINK/configuration/flink-log4j-properties.xml
@@ -0,0 +1,90 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-->
+<configuration supports_final="false" supports_adding_forbidden="true">
+  <property>
+    <name>content</name>
+    <description>Flink-log4j-Properties</description>
+    <value>
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Allows this configuration to be modified at runtime. The file will be checked every 30 seconds.
+monitorInterval=30
+
+# This affects logging for both user code and Flink
+rootLogger.level = INFO
+rootLogger.appenderRef.file.ref = MainAppender
+
+# Uncomment this if you want to _only_ change Flink's logging
+#logger.flink.name = org.apache.flink
+#logger.flink.level = INFO
+
+# The following lines keep the log level of common libraries/connectors on
+# log level INFO. The root logger does not override this. You have to manually
+# change the log levels here.
+logger.akka.name = akka
+logger.akka.level = INFO
+logger.kafka.name= org.apache.kafka
+logger.kafka.level = INFO
+logger.hadoop.name = org.apache.hadoop
+logger.hadoop.level = INFO
+logger.zookeeper.name = org.apache.zookeeper
+logger.zookeeper.level = INFO
+logger.shaded_zookeeper.name = org.apache.flink.shaded.zookeeper3
+logger.shaded_zookeeper.level = INFO
+
+# Log all infos in the given file
+appender.main.name = MainAppender
+appender.main.type = RollingFile
+appender.main.append = true
+appender.main.fileName = ${sys:log.file}
+appender.main.filePattern = ${sys:log.file}.%i
+appender.main.layout.type = PatternLayout
+appender.main.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
+appender.main.policies.type = Policies
+appender.main.policies.size.type = SizeBasedTriggeringPolicy
+appender.main.policies.size.size = 100MB
+appender.main.policies.startup.type = OnStartupTriggeringPolicy
+appender.main.strategy.type = DefaultRolloverStrategy
+appender.main.strategy.max = ${env:MAX_LOG_FILE_NUMBER:-10}
+    </value>
+    <value-attributes>
+      <type>content</type>
+      <show-property-name>false</show-property-name>
+    </value-attributes>
+    <on-ambari-upgrade add="true"/>
+  </property>
+</configuration>
diff --git a/bigtop-packages/src/common/bigtop-ambari-mpack/bgtp-ambari-mpack/src/main/resources/stacks/BGTP/1.0/services/FLINK/configuration/flink-log4j-session-properties.xml b/bigtop-packages/src/common/bigtop-ambari-mpack/bgtp-ambari-mpack/src/main/resources/stacks/BGTP/1.0/services/FLINK/configuration/flink-log4j-session-properties.xml
new file mode 100755
index 00000000..4c6a408b
--- /dev/null
+++ b/bigtop-packages/src/common/bigtop-ambari-mpack/bgtp-ambari-mpack/src/main/resources/stacks/BGTP/1.0/services/FLINK/configuration/flink-log4j-session-properties.xml
@@ -0,0 +1,75 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-->
+<configuration supports_final="false" supports_adding_forbidden="true">
+  <property>
+    <name>content</name>
+    <description>Flink-log4j-session-Properties</description>
+    <value>
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Allows this configuration to be modified at runtime. The file will be checked every 30 seconds.
+monitorInterval=30
+
+rootLogger.level = INFO
+rootLogger.appenderRef.console.ref = ConsoleAppender
+
+appender.console.name = ConsoleAppender
+appender.console.type = CONSOLE
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
+
+# Suppress the irrelevant (wrong) warnings from the Netty channel handler
+logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline
+logger.netty.level = OFF
+logger.zookeeper.name = org.apache.zookeeper
+logger.zookeeper.level = WARN
+logger.shaded_zookeeper.name = org.apache.flink.shaded.zookeeper3
+logger.shaded_zookeeper.level = WARN
+logger.curator.name = org.apache.flink.shaded.org.apache.curator.framework
+logger.curator.level = WARN
+logger.runtimeutils.name= org.apache.flink.runtime.util.ZooKeeperUtils
+logger.runtimeutils.level = WARN
+logger.runtimeleader.name = org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalDriver
+logger.runtimeleader.level = WARN
+    </value>
+    <value-attributes>
+      <type>content</type>
+      <show-property-name>false</show-property-name>
+    </value-attributes>
+    <on-ambari-upgrade add="true"/>
+  </property>
+</configuration>
diff --git a/bigtop-packages/src/common/bigtop-ambari-mpack/bgtp-ambari-mpack/src/main/resources/stacks/BGTP/1.0/services/FLINK/kerberos.json b/bigtop-packages/src/common/bigtop-ambari-mpack/bgtp-ambari-mpack/src/main/resources/stacks/BGTP/1.0/services/FLINK/kerberos.json
new file mode 100644
index 00000000..9715a661
--- /dev/null
+++ b/bigtop-packages/src/common/bigtop-ambari-mpack/bgtp-ambari-mpack/src/main/resources/stacks/BGTP/1.0/services/FLINK/kerberos.json
@@ -0,0 +1,50 @@
+{
+  "services": [
+    {
+      "name": "FLINK",
+      "identities": [
+        {
+          "name": "flink_service_identity",
+          "principal": {
+            "value": "${flink-env/flink_user}${principal_suffix}@${realm}",
+            "type": "user",
+            "local_username": "${flink-env/flink_user}",
+            "configuration": "flink-conf/security.kerberos.login.principal"
+          },
+          "keytab": {
+            "file": "${keytab_dir}/flink.headless.keytab",
+            "configuration": "flink-conf/security.kerberos.login.keytab",
+            "owner": {
+              "name": "${flink-env/flink_user}",
+              "access": "r"
+            },
+            "group": {
+              "name": "${cluster-env/user_group}",
+              "access": "r"
+            }
+          }
+        }
+      ],
+      "components": [
+        {
+          "name": "FLINK_HISTORYSERVER",
+          "identities": [
+            {
+              "name": "flink_historyserver_identity",
+              "reference": "/FLINK/flink_service_identity"
+            }
+          ]
+        },
+        {
+          "name": "FLINK_CLIENT",
+          "identities": [
+            {
+              "name": "flink_client_identity",
+              "reference": "/FLINK/flink_service_identity"
+            }
+          ]
+        }
+      ]
+    }
+  ]
+}
diff --git a/bigtop-packages/src/common/bigtop-ambari-mpack/bgtp-ambari-mpack/src/main/resources/stacks/BGTP/1.0/services/FLINK/metainfo.xml b/bigtop-packages/src/common/bigtop-ambari-mpack/bgtp-ambari-mpack/src/main/resources/stacks/BGTP/1.0/services/FLINK/metainfo.xml
new file mode 100755
index 00000000..69373f91
--- /dev/null
+++ b/bigtop-packages/src/common/bigtop-ambari-mpack/bgtp-ambari-mpack/src/main/resources/stacks/BGTP/1.0/services/FLINK/metainfo.xml
@@ -0,0 +1,144 @@
+<?xml version="1.0"?>
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<metainfo>
+  <schemaVersion>2.0</schemaVersion>
+  <services>
+    <service>
+      <name>FLINK</name>
+      <displayName>Flink</displayName>
+      <comment>Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams</comment>
+      <version>Bigtop+3.1</version>
+      <components>
+        <component>
+          <name>FLINK_HISTORYSERVER</name>
+          <displayName>Flink History Server</displayName>
+          <category>MASTER</category>
+          <cardinality>1+</cardinality>
+          <versionAdvertised>true</versionAdvertised>
+          <dependencies>
+            <dependency>
+              <name>HDFS/HDFS_CLIENT</name>
+              <scope>host</scope>
+              <auto-deploy>
+                <enabled>true</enabled>
+              </auto-deploy>
+            </dependency>
+            <dependency>
+              <name>MAPREDUCE2/MAPREDUCE2_CLIENT</name>
+              <scope>host</scope>
+              <auto-deploy>
+                <enabled>true</enabled>
+              </auto-deploy>
+            </dependency>
+            <dependency>
+              <name>YARN/YARN_CLIENT</name>
+              <scope>host</scope>
+              <auto-deploy>
+                <enabled>true</enabled>
+              </auto-deploy>
+            </dependency>
+          </dependencies>
+          <commandScript>
+            <script>scripts/flink_history_server.py</script>
+            <scriptType>PYTHON</scriptType>
+            <timeout>600</timeout>
+          </commandScript>
+        </component>
+        <component>
+          <name>FLINK_CLIENT</name>
+          <displayName>Flink Client</displayName>
+          <cardinality>1+</cardinality>
+          <versionAdvertised>true</versionAdvertised>
+          <category>CLIENT</category>
+          <commandScript>
+            <script>scripts/flink_client.py</script>
+            <scriptType>PYTHON</scriptType>
+            <timeout>1200</timeout>
+          </commandScript>
+          <configFiles>
+            <configFile>
+              <type>xml</type>
+              <fileName>flink-conf.xml</fileName>
+              <dictionaryName>flink-conf</dictionaryName>
+            </configFile>
+          </configFiles>
+          <dependencies>
+            <dependency>
+              <name>HDFS/HDFS_CLIENT</name>
+              <scope>host</scope>
+              <auto-deploy>
+                <enabled>true</enabled>
+              </auto-deploy>
+            </dependency>
+            <dependency>
+              <name>YARN/YARN_CLIENT</name>
+              <scope>host</scope>
+              <auto-deploy>
+                <enabled>true</enabled>
+              </auto-deploy>
+            </dependency>
+            <dependency>
+              <name>MAPREDUCE2/MAPREDUCE2_CLIENT</name>
+              <scope>host</scope>
+              <auto-deploy>
+                <enabled>true</enabled>
+              </auto-deploy>
+            </dependency>
+          </dependencies>
+        </component>
+      </components>
+      <osSpecifics>
+        <osSpecific>
+          <osFamily>any</osFamily>
+          <packages>
+            <package>
+              <name>flink</name>
+            </package>
+          </packages>
+        </osSpecific>
+      </osSpecifics>
+
+      <quickLinksConfigurations>
+        <quickLinksConfiguration>
+          <fileName>quicklinks.json</fileName>
+          <default>true</default>
+        </quickLinksConfiguration>
+      </quickLinksConfigurations>
+
+      <requiredServices>
+        <service>YARN</service>
+      </requiredServices>
+
+      <configuration-dependencies>
+        <config-type>flink-conf</config-type>
+        <config-type>flink-env</config-type>
+        <config-type>flink-log4j-cli-properties</config-type>
+        <config-type>flink-log4j-console-properties</config-type>
+        <config-type>flink-log4j-properties</config-type>
+        <config-type>flink-log4j-session-properties</config-type>
+      </configuration-dependencies>
+
+      <commandScript>
+        <script>scripts/service_check.py</script>
+        <scriptType>PYTHON</scriptType>
+        <timeout>300</timeout>
+      </commandScript>
+
+    </service>
+  </services>
+</metainfo>
diff --git a/bigtop-packages/src/common/bigtop-ambari-mpack/bgtp-ambari-mpack/src/main/resources/stacks/BGTP/1.0/services/FLINK/package/scripts/flink_client.py b/bigtop-packages/src/common/bigtop-ambari-mpack/bgtp-ambari-mpack/src/main/resources/stacks/BGTP/1.0/services/FLINK/package/scripts/flink_client.py
new file mode 100755
index 00000000..073b96b7
--- /dev/null
+++ b/bigtop-packages/src/common/bigtop-ambari-mpack/bgtp-ambari-mpack/src/main/resources/stacks/BGTP/1.0/services/FLINK/package/scripts/flink_client.py
@@ -0,0 +1,47 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+import os
+
+from resource_management.core.exceptions import ClientComponentHasNoStatus
+from resource_management.core.logger import Logger
+from resource_management.libraries.script import Script
+
+from setup_flink import *
+
+class FlinkClient(Script):
+
+  def pre_install(self, env):
+    import params
+    env.set_params(params)
+
+  def configure(self, env, config_dir=None, upgrade_type=None):
+    import params
+    env.set_params(params)
+    setup_flink(env,"client",upgrade_type=upgrade_type, action = 'config')
+
+  def install(self, env):
+    import params
+    self.install_packages(env)
+    self.configure(env, config_dir=params.flink_config_dir)
+
+  def status(self, env):
+    raise ClientComponentHasNoStatus()
+
+if __name__ == "__main__":
+  FlinkClient().execute()
diff --git a/bigtop-packages/src/common/bigtop-ambari-mpack/bgtp-ambari-mpack/src/main/resources/stacks/BGTP/1.0/services/FLINK/package/scripts/flink_history_server.py b/bigtop-packages/src/common/bigtop-ambari-mpack/bgtp-ambari-mpack/src/main/resources/stacks/BGTP/1.0/services/FLINK/package/scripts/flink_history_server.py
new file mode 100755
index 00000000..0851464f
--- /dev/null
+++ b/bigtop-packages/src/common/bigtop-ambari-mpack/bgtp-ambari-mpack/src/main/resources/stacks/BGTP/1.0/services/FLINK/package/scripts/flink_history_server.py
@@ -0,0 +1,88 @@
+#!/usr/bin/python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+
+import sys
+import os
+
+from resource_management.libraries.script.script import Script
+from resource_management.libraries.functions import stack_select
+from resource_management.libraries.functions.check_process_status import check_process_status
+from resource_management.libraries.functions.stack_features import check_stack_feature
+from resource_management.libraries.functions.constants import StackFeature
+from resource_management.core.logger import Logger
+from resource_management.core import shell
+from setup_flink import *
+from flink_service import flink_service
+
+class FlinkHistoryServer(Script):
+
+  def install(self, env):
+    import params
+    env.set_params(params)
+    
+    self.install_packages(env)
+    
+  def configure(self, env, upgrade_type=None, config_dir=None):
+    import params
+    env.set_params(params)
+    
+    setup_flink(env, 'historyserver', upgrade_type=upgrade_type, action = 'config')
+    
+  def start(self, env, upgrade_type=None):
+    import params
+    env.set_params(params)
+    
+    self.configure(env)
+    flink_service('historyserver', upgrade_type=upgrade_type, action='start')
+
+  def stop(self, env, upgrade_type=None):
+    import params
+    env.set_params(params)
+    
+    flink_service('historyserver', upgrade_type=upgrade_type, action='stop')
+
+  def status(self, env):
+    import status_params
+    env.set_params(status_params)
+
+    check_process_status(status_params.flink_history_server_pid_file)
+    
+  def pre_upgrade_restart(self, env, upgrade_type=None):
+    import params
+
+    env.set_params(params)
+    if params.version and check_stack_feature(StackFeature.ROLLING_UPGRADE, params.version):
+      Logger.info("Executing Flink History Server Stack Upgrade pre-restart")
+      stack_select.select_packages(params.version)
+
+  def get_log_folder(self):
+    import params
+    return params.flink_log_dir
+  
+  def get_user(self):
+    import params
+    return params.flink_user
+
+  def get_pid_files(self):
+    import status_params
+    return [status_params.flink_history_server_pid_file]
+
+if __name__ == "__main__":
+  FlinkHistoryServer().execute()
diff --git a/bigtop-packages/src/common/bigtop-ambari-mpack/bgtp-ambari-mpack/src/main/resources/stacks/BGTP/1.0/services/FLINK/package/scripts/flink_service.py b/bigtop-packages/src/common/bigtop-ambari-mpack/bgtp-ambari-mpack/src/main/resources/stacks/BGTP/1.0/services/FLINK/package/scripts/flink_service.py
new file mode 100755
index 00000000..f5cbb20c
--- /dev/null
+++ b/bigtop-packages/src/common/bigtop-ambari-mpack/bgtp-ambari-mpack/src/main/resources/stacks/BGTP/1.0/services/FLINK/package/scripts/flink_service.py
@@ -0,0 +1,77 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+import os
+import shutil
+import glob
+
+from resource_management.libraries.script.script import Script
+from resource_management.libraries.resources.hdfs_resource import HdfsResource
+from resource_management.libraries.functions.copy_tarball import copy_to_hdfs, get_tarball_paths
+from resource_management.libraries.functions import format
+from resource_management.core.resources.system import File, Execute
+from resource_management.libraries.functions.version import format_stack_version
+from resource_management.libraries.functions.stack_features import check_stack_feature
+from resource_management.libraries.functions.check_process_status import check_process_status
+from resource_management.libraries.functions.constants import StackFeature
+from resource_management.libraries.functions.show_logs import show_logs
+from resource_management.core.shell import as_sudo
+from resource_management.core.exceptions import ComponentIsNotRunning
+from resource_management.core.logger import Logger
+
+def flink_service(name, upgrade_type=None, action=None):
+  import params
+
+  if action == 'start':
+    if name == 'historyserver':
+      # create flink history directory
+      params.HdfsResource(params.jobmanager_archive_fs_dir,
+                          type="directory",
+                          action="create_on_execute",
+                          owner=params.flink_user,
+                          group=params.user_group,
+                          mode=0777,
+                          recursive_chmod=True
+                          )
+      params.HdfsResource(None, action="execute")
+
+      historyserver_no_op_test = as_sudo(["test", "-f", params.flink_history_server_pid_file]) + " && " + as_sudo(["pgrep", "-F", params.flink_history_server_pid_file])
+      try:
+        Execute(params.flink_history_server_start,
+                user=params.flink_user,
+                environment={'JAVA_HOME': params.java_home},
+                not_if=historyserver_no_op_test)
+      except:
+        show_logs(params.flink_log_dir, user=params.flink_user)
+        raise
+
+  elif action == 'stop':
+    if name == 'historyserver':
+      try:
+        Execute(format('{flink_history_server_stop}'),
+                user=params.flink_user,
+                environment={'JAVA_HOME': params.java_home}
+        )
+      except:
+        show_logs(params.flink_log_dir, user=params.flink_user)
+        raise
+
+      File(params.flink_history_server_pid_file,
+        action="delete"
+      )
\ No newline at end of file
diff --git a/bigtop-packages/src/common/bigtop-ambari-mpack/bgtp-ambari-mpack/src/main/resources/stacks/BGTP/1.0/services/FLINK/package/scripts/params.py b/bigtop-packages/src/common/bigtop-ambari-mpack/bgtp-ambari-mpack/src/main/resources/stacks/BGTP/1.0/services/FLINK/package/scripts/params.py
new file mode 100755
index 00000000..a19ca33d
--- /dev/null
+++ b/bigtop-packages/src/common/bigtop-ambari-mpack/bgtp-ambari-mpack/src/main/resources/stacks/BGTP/1.0/services/FLINK/package/scripts/params.py
@@ -0,0 +1,115 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+import os
+import status_params
+
+from resource_management.libraries.functions import format
+from resource_management.libraries.resources import HdfsResource
+from resource_management.libraries.functions import conf_select
+from resource_management.libraries.functions import stack_select
+from resource_management.libraries.functions import StackFeature
+from resource_management.libraries.functions.stack_features import check_stack_feature
+from resource_management.libraries.functions.version import format_stack_version
+from resource_management.libraries.functions.default import default
+from resource_management.libraries.functions import get_kinit_path
+from resource_management.libraries.functions.get_not_managed_resources import get_not_managed_resources
+from resource_management.libraries.script.script import Script
+
+# server configurations
+config = Script.get_config()
+tmp_dir = Script.get_tmp_dir()
+
+stack_name = default("/clusterLevelParams/stack_name", None)
+stack_root = Script.get_stack_root()
+
+# This is expected to be of the form #.#.#.#
+stack_version_unformatted = config['clusterLevelParams']['stack_version']
+stack_version_formatted = format_stack_version(stack_version_unformatted)
+
+# New Cluster Stack Version that is defined during the RESTART of a Rolling Upgrade
+version = default("/commandParams/version", None)
+java_home = config['ambariLevelParams']['java_home']
+
+# default hadoop parameters
+hadoop_home = stack_select.get_hadoop_dir("home")
+hadoop_bin_dir = stack_select.get_hadoop_dir("bin")
+hadoop_conf_dir = conf_select.get_hadoop_conf_dir()
+dfs_type = default("/clusterLevelParams/dfs_type", "")
+hdfs_user = config['configurations']['hadoop-env']['hdfs_user']
+hdfs_principal_name = config['configurations']['hadoop-env']['hdfs_principal_name']
+hdfs_user_keytab = config['configurations']['hadoop-env']['hdfs_user_keytab']
+default_fs = config['configurations']['core-site']['fs.defaultFS']
+hdfs_site = config['configurations']['hdfs-site']
+hdfs_resource_ignore_file = "/var/lib/ambari-agent/data/.hdfs_resource_ignore"
+
+flink_etc_dir = "/etc/flink"
+flink_config_dir = "/etc/flink/conf"
+flink_dir = "/usr/lib/flink"
+flink_bin_dir = "/usr/lib/flink/bin"
+flink_log_dir = config['configurations']['flink-env']['flink_log_dir']
+flink_pid_dir = config['configurations']['flink-env']['flink_pid_dir']
+
+kinit_path_local = get_kinit_path(default('/configurations/kerberos-env/executable_search_paths', None))
+security_enabled = config['configurations']['cluster-env']['security_enabled']
+smokeuser = config['configurations']['cluster-env']['smokeuser']
+smokeuser_principal = config['configurations']['cluster-env']['smokeuser_principal_name']
+smoke_user_keytab = config['configurations']['cluster-env']['smokeuser_keytab']
+
+flink_user = config['configurations']['flink-env']['flink_user']
+user_group = config['configurations']['cluster-env']['user_group']
+flink_conf_template = config['configurations']['flink-conf']['content']
+flink_group = config['configurations']['flink-env']['flink_group']
+flink_hdfs_user_dir = format("/user/{flink_user}")
+
+flink_log4j_cli_properties = config['configurations']['flink-log4j-cli-properties']['content']
+flink_log4j_console_properties = config['configurations']['flink-log4j-console-properties']['content']
+flink_log4j_properties = config['configurations']['flink-log4j-properties']['content']
+flink_log4j_session_properties = config['configurations']['flink-log4j-session-properties']['content']
+
+jobmanager_archive_fs_dir = config['configurations']['flink-conf']['jobmanager.archive.fs.dir']
+historyserver_archive_fs_dir = config['configurations']['flink-conf']['historyserver.archive.fs.dir']
+historyserver_web_port = config['configurations']['flink-conf']['historyserver.web.port']
+historyserver_archive_fs_refresh_interval = config['configurations']['flink-conf']['historyserver.archive.fs.refresh-interval']
+
+flink_history_server_start = format("export HADOOP_CLASSPATH=`hadoop classpath`;{flink_dir}/bin/historyserver.sh start")
+flink_history_server_stop = format("{flink_dir}/bin/historyserver.sh stop")
+flink_history_server_pid_file = status_params.flink_history_server_pid_file
+
+security_kerberos_login_principal = config['configurations']['flink-conf']['security.kerberos.login.principal']
+security_kerberos_login_keytab = config['configurations']['flink-conf']['security.kerberos.login.keytab']
+
+import functools
+#create partial functions with common arguments for every HdfsResource call
+#to create/delete hdfs directory/file/copyfromlocal we need to call params.HdfsResource in code
+HdfsResource = functools.partial(
+  HdfsResource,
+  user=hdfs_user,
+  hdfs_resource_ignore_file = hdfs_resource_ignore_file,
+  security_enabled = security_enabled,
+  keytab = hdfs_user_keytab,
+  kinit_path_local = kinit_path_local,
+  hadoop_bin_dir = hadoop_bin_dir,
+  hadoop_conf_dir = hadoop_conf_dir,
+  principal_name = hdfs_principal_name,
+  hdfs_site = hdfs_site,
+  default_fs = default_fs,
+  immutable_paths = get_not_managed_resources(),
+  dfs_type = dfs_type
+)
\ No newline at end of file
diff --git a/bigtop-packages/src/common/bigtop-ambari-mpack/bgtp-ambari-mpack/src/main/resources/stacks/BGTP/1.0/services/FLINK/package/scripts/service_check.py b/bigtop-packages/src/common/bigtop-ambari-mpack/bgtp-ambari-mpack/src/main/resources/stacks/BGTP/1.0/services/FLINK/package/scripts/service_check.py
new file mode 100755
index 00000000..717ca36a
--- /dev/null
+++ b/bigtop-packages/src/common/bigtop-ambari-mpack/bgtp-ambari-mpack/src/main/resources/stacks/BGTP/1.0/services/FLINK/package/scripts/service_check.py
@@ -0,0 +1,46 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+
+import os
+
+from resource_management.libraries.functions.format import format
+from resource_management.core.resources import Execute
+from resource_management.libraries.script import Script
+from resource_management.core.resources.system import Directory
+
+class FlinkServiceCheck(Script):
+  def service_check(self, env):
+    import params
+    env.set_params(params)
+
+    if params.security_enabled:
+       flink_kinit_cmd = format("{kinit_path_local} -kt {smoke_user_keytab} {smokeuser_principal}; ")
+       Execute(flink_kinit_cmd, user=params.smokeuser)
+
+    job_cmd_opts= "-m yarn-cluster -yD classloader.check-leaked-classloader=false "
+    run_flink_wordcount_job = format("export HADOOP_CLASSPATH=`hadoop classpath`;{flink_bin_dir}/flink run {job_cmd_opts} {flink_bin_dir}/../examples/batch/WordCount.jar")
+
+    Execute(run_flink_wordcount_job,
+      logoutput=True,
+      environment={'JAVA_HOME':params.java_home,'HADOOP_CONF_DIR': params.hadoop_conf_dir},
+      user=params.smokeuser)
+            
+if __name__ == "__main__":
+  FlinkServiceCheck().execute()
diff --git a/bigtop-packages/src/common/bigtop-ambari-mpack/bgtp-ambari-mpack/src/main/resources/stacks/BGTP/1.0/services/FLINK/package/scripts/setup_flink.py b/bigtop-packages/src/common/bigtop-ambari-mpack/bgtp-ambari-mpack/src/main/resources/stacks/BGTP/1.0/services/FLINK/package/scripts/setup_flink.py
new file mode 100755
index 00000000..75a8b6d1
--- /dev/null
+++ b/bigtop-packages/src/common/bigtop-ambari-mpack/bgtp-ambari-mpack/src/main/resources/stacks/BGTP/1.0/services/FLINK/package/scripts/setup_flink.py
@@ -0,0 +1,94 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+# Python Imports
+import os
+
+# Local Imports
+from resource_management.core.resources.system import Directory, File,Link
+from resource_management.core.source import InlineTemplate
+from ambari_commons.os_family_impl import OsFamilyFuncImpl, OsFamilyImpl
+
+@OsFamilyFuncImpl(os_family=OsFamilyImpl.DEFAULT)
+def setup_flink(env, type, upgrade_type = None, action = None):
+  import params
+
+  Directory(params.flink_pid_dir,
+            owner=params.flink_user,
+            group=params.user_group,
+            mode=0775,
+            create_parents = True
+  )
+
+  Directory(params.flink_etc_dir, mode=0755)
+  Directory(params.flink_config_dir,
+            owner = params.flink_user,
+            group = params.user_group,
+            create_parents = True)
+
+  Directory(params.flink_log_dir,mode=0767)
+  Link(params.flink_dir + '/log',to=params.flink_log_dir)
+
+  if type == 'historyserver' and action == 'config':
+    params.HdfsResource(params.flink_hdfs_user_dir,
+                     type="directory",
+                     action="create_on_execute",
+                     owner=params.flink_user,
+                     mode=0775
+    )
+
+    params.HdfsResource(None, action="execute")
+
+  flink_conf_file_path = os.path.join(params.flink_config_dir, "flink-conf.yaml")
+  File(flink_conf_file_path,
+       owner=params.flink_user,
+       group = params.flink_group,
+       content=InlineTemplate(params.flink_conf_template),
+       mode=0755)
+
+  #create log4j.properties in /etc/conf dir
+  File(os.path.join(params.flink_config_dir, 'log4j.properties'),
+       owner=params.flink_user,
+       group=params.flink_group,
+       content=params.flink_log4j_properties,
+       mode=0644,
+  )
+
+  #create log4j-cli.properties in /etc/conf dir
+  File(os.path.join(params.flink_config_dir, 'log4j-cli.properties'),
+       owner=params.flink_user,
+       group=params.flink_group,
+       content=params.flink_log4j_cli_properties,
+       mode=0644,
+  )
+
+  #create log4j-console.properties in /etc/conf dir
+  File(os.path.join(params.flink_config_dir, 'log4j-console.properties'),
+       owner=params.flink_user,
+       group=params.flink_group,
+       content=params.flink_log4j_console_properties,
+       mode=0644,
+  )
+
+  #create log4j-session.properties in /etc/conf dir
+  File(os.path.join(params.flink_config_dir, 'log4j-session.properties'),
+       owner=params.flink_user,
+       group=params.flink_group,
+       content=params.flink_log4j_session_properties,
+       mode=0644,
+  )
diff --git a/bigtop-packages/src/common/bigtop-ambari-mpack/bgtp-ambari-mpack/src/main/resources/stacks/BGTP/1.0/services/FLINK/package/scripts/status_params.py b/bigtop-packages/src/common/bigtop-ambari-mpack/bgtp-ambari-mpack/src/main/resources/stacks/BGTP/1.0/services/FLINK/package/scripts/status_params.py
new file mode 100755
index 00000000..87bc00b6
--- /dev/null
+++ b/bigtop-packages/src/common/bigtop-ambari-mpack/bgtp-ambari-mpack/src/main/resources/stacks/BGTP/1.0/services/FLINK/package/scripts/status_params.py
@@ -0,0 +1,33 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+
+from resource_management.libraries.functions.format import format
+from resource_management.libraries.script.script import Script
+from resource_management.libraries.functions.default import default
+
+config = Script.get_config()
+
+flink_user = config['configurations']['flink-env']['flink_user']
+flink_group = config['configurations']['flink-env']['flink_group']
+user_group = config['configurations']['cluster-env']['user_group']
+
+flink_pid_dir = config['configurations']['flink-env']['flink_pid_dir']
+flink_history_server_pid_file = format("{flink_pid_dir}/flink-{flink_user}-historyserver.pid")
+stack_name = default("/clusterLevelParams/stack_name", None)
diff --git a/bigtop-packages/src/common/bigtop-ambari-mpack/bgtp-ambari-mpack/src/main/resources/stacks/BGTP/1.0/services/FLINK/quicklinks/quicklinks.json b/bigtop-packages/src/common/bigtop-ambari-mpack/bgtp-ambari-mpack/src/main/resources/stacks/BGTP/1.0/services/FLINK/quicklinks/quicklinks.json
new file mode 100755
index 00000000..4275b48e
--- /dev/null
+++ b/bigtop-packages/src/common/bigtop-ambari-mpack/bgtp-ambari-mpack/src/main/resources/stacks/BGTP/1.0/services/FLINK/quicklinks/quicklinks.json
@@ -0,0 +1,28 @@
+{
+  "name": "default",
+  "description": "default quick links configuration",
+  "configuration": {
+    "protocol":
+    {
+      "type":"HTTP_ONLY"
+    },
+
+    "links": [
+      {
+        "name": "flink_history_server_ui",
+        "label": "Flink History Server UI",
+        "component_name": "FLINK_HISTORYSERVER",
+        "requires_user_name": "false",
+        "url": "%@://%@:%@",
+        "port":{
+          "http_property": "historyserver.web.port",
+          "http_default_port": "8082",
+          "https_property": "historyserver.web.port",
+          "https_default_port": "8082",
+          "regex": "^(\\d+)$",
+          "site": "flink-conf"
+        }
+      }
+    ]
+  }
+}
diff --git a/bigtop-packages/src/common/bigtop-ambari-mpack/bgtp-ambari-mpack/src/main/resources/stacks/BGTP/1.0/services/FLINK/role_command_order.json b/bigtop-packages/src/common/bigtop-ambari-mpack/bgtp-ambari-mpack/src/main/resources/stacks/BGTP/1.0/services/FLINK/role_command_order.json
new file mode 100755
index 00000000..ab32557b
--- /dev/null
+++ b/bigtop-packages/src/common/bigtop-ambari-mpack/bgtp-ambari-mpack/src/main/resources/stacks/BGTP/1.0/services/FLINK/role_command_order.json
@@ -0,0 +1,7 @@
+{
+  "general_deps" : {
+    "_comment" : "dependencies for FLINK",
+    "FLINK_HISTORYSERVER-START": ["NAMENODE-START", "DATANODE-START"],
+    "FLINK_SERVICE_CHECK-SERVICE_CHECK": ["NODEMANAGER-START", "RESOURCEMANAGER-START", "FLINK_HISTORYSERVER-START"]
+  }
+}