You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/07/27 16:35:58 UTC
[1/2] flink git commit: [FLINK-7086] Add Flip-6 standalone session
cluster entry point
Repository: flink
Updated Branches:
refs/heads/master 7cf997d16 -> fa16e9a81
[FLINK-7086] Add Flip-6 standalone session cluster entry point
This closes #4272.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9af8036f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9af8036f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9af8036f
Branch: refs/heads/master
Commit: 9af8036f3d42e6f4c2180e254e35faa8777e0292
Parents: 7cf997d
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Jul 6 17:16:54 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Jul 27 17:08:19 2017 +0200
----------------------------------------------------------------------
.../StandaloneSessionClusterEntrypoint.java | 74 ++++++++++++++++++++
1 file changed, 74 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9af8036f/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java
new file mode 100644
index 0000000..025f128
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
+import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+
+/**
+ * Entry point for the standalone session cluster.
+ */
+public class StandaloneSessionClusterEntrypoint extends SessionClusterEntrypoint {
+
+ @Override
+ protected ResourceManager<?> createResourceManager(
+ Configuration configuration,
+ ResourceID resourceId,
+ RpcService rpcService,
+ HighAvailabilityServices highAvailabilityServices,
+ HeartbeatServices heartbeatServices,
+ MetricRegistry metricRegistry,
+ FatalErrorHandler fatalErrorHandler) throws Exception {
+ final ResourceManagerConfiguration resourceManagerConfiguration = ResourceManagerConfiguration.fromConfiguration(configuration);
+ final ResourceManagerRuntimeServicesConfiguration resourceManagerRuntimeServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
+ final ResourceManagerRuntimeServices resourceManagerRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(
+ resourceManagerRuntimeServicesConfiguration,
+ highAvailabilityServices,
+ rpcService.getScheduledExecutor());
+
+ return new StandaloneResourceManager(
+ rpcService,
+ FlinkResourceManager.RESOURCE_MANAGER_NAME,
+ resourceId,
+ resourceManagerConfiguration,
+ highAvailabilityServices,
+ heartbeatServices,
+ resourceManagerRuntimeServices.getSlotManager(),
+ metricRegistry,
+ resourceManagerRuntimeServices.getJobLeaderIdService(),
+ fatalErrorHandler);
+ }
+
+ public static void main(String[] args) {
+ StandaloneSessionClusterEntrypoint entrypoint = new StandaloneSessionClusterEntrypoint();
+
+ entrypoint.startCluster(args);
+ }
+}
[2/2] flink git commit: [FLINK-7098] Adapt startup scripts to start
Flip-6 standalone session cluster
Posted by tr...@apache.org.
[FLINK-7098] Adapt startup scripts to start Flip-6 standalone session cluster
This closes #4262.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fa16e9a8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fa16e9a8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fa16e9a8
Branch: refs/heads/master
Commit: fa16e9a81517d216e7983caf425c84b1e2163b82
Parents: 9af8036
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Jul 5 15:00:59 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Jul 27 17:08:58 2017 +0200
----------------------------------------------------------------------
flink-dist/src/main/flink-bin/bin/config.sh | 9 +++++----
flink-dist/src/main/flink-bin/bin/flink-console.sh | 4 ++++
flink-dist/src/main/flink-bin/bin/flink-daemon.sh | 4 ++++
flink-dist/src/main/flink-bin/bin/jobmanager.sh | 12 +++++++++---
flink-dist/src/main/flink-bin/bin/start-cluster.sh | 13 ++++++++++---
flink-dist/src/main/flink-bin/bin/stop-cluster.sh | 8 +++++---
flink-dist/src/main/flink-bin/bin/taskmanager.sh | 2 +-
7 files changed, 38 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/fa16e9a8/flink-dist/src/main/flink-bin/bin/config.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/config.sh b/flink-dist/src/main/flink-bin/bin/config.sh
index 66f0d5b..5882062 100755
--- a/flink-dist/src/main/flink-bin/bin/config.sh
+++ b/flink-dist/src/main/flink-bin/bin/config.sh
@@ -464,16 +464,17 @@ readSlaves() {
}
# starts or stops TMs on all slaves
-# TMSlaves start|stop
+# TMSlaves start|stop [flip6]
TMSlaves() {
CMD=$1
+ FLIP6=$2
readSlaves
if [ ${SLAVES_ALL_LOCALHOST} = true ] ; then
# all-local setup
for slave in ${SLAVES[@]}; do
- "${FLINK_BIN_DIR}"/taskmanager.sh "${CMD}"
+ "${FLINK_BIN_DIR}"/taskmanager.sh "${CMD}" "${FLIP6}"
done
else
# non-local setup
@@ -481,11 +482,11 @@ TMSlaves() {
command -v pdsh >/dev/null 2>&1
if [[ $? -ne 0 ]]; then
for slave in ${SLAVES[@]}; do
- ssh -n $FLINK_SSH_OPTS $slave -- "nohup /bin/bash -l \"${FLINK_BIN_DIR}/taskmanager.sh\" \"${CMD}\" &"
+ ssh -n $FLINK_SSH_OPTS $slave -- "nohup /bin/bash -l \"${FLINK_BIN_DIR}/taskmanager.sh\" \"${CMD}\" \"${FLIP6}\" &"
done
else
PDSH_SSH_ARGS="" PDSH_SSH_ARGS_APPEND=$FLINK_SSH_OPTS pdsh -w $(IFS=, ; echo "${SLAVES[*]}") \
- "nohup /bin/bash -l \"${FLINK_BIN_DIR}/taskmanager.sh\" \"${CMD}\""
+ "nohup /bin/bash -l \"${FLINK_BIN_DIR}/taskmanager.sh\" \"${CMD}\" \"${FLIP6}\""
fi
fi
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fa16e9a8/flink-dist/src/main/flink-bin/bin/flink-console.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/flink-console.sh b/flink-dist/src/main/flink-bin/bin/flink-console.sh
index 560c1de..574376c 100644
--- a/flink-dist/src/main/flink-bin/bin/flink-console.sh
+++ b/flink-dist/src/main/flink-bin/bin/flink-console.sh
@@ -46,6 +46,10 @@ case $SERVICE in
CLASS_TO_RUN=org.apache.flink.runtime.zookeeper.FlinkZooKeeperQuorumPeer
;;
+ (standalonesession)
+ CLASS_TO_RUN=org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint
+ ;;
+
(*)
echo "Unknown service '${SERVICE}'. $USAGE."
exit 1
http://git-wip-us.apache.org/repos/asf/flink/blob/fa16e9a8/flink-dist/src/main/flink-bin/bin/flink-daemon.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/flink-daemon.sh b/flink-dist/src/main/flink-bin/bin/flink-daemon.sh
index 254c3c4..6985776 100644
--- a/flink-dist/src/main/flink-bin/bin/flink-daemon.sh
+++ b/flink-dist/src/main/flink-bin/bin/flink-daemon.sh
@@ -50,6 +50,10 @@ case $DAEMON in
CLASS_TO_RUN=org.apache.flink.runtime.webmonitor.history.HistoryServer
;;
+ (standalonesession)
+ CLASS_TO_RUN=org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint
+ ;;
+
(*)
echo "Unknown daemon '${DAEMON}'. $USAGE."
exit 1
http://git-wip-us.apache.org/repos/asf/flink/blob/fa16e9a8/flink-dist/src/main/flink-bin/bin/jobmanager.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/jobmanager.sh b/flink-dist/src/main/flink-bin/bin/jobmanager.sh
index f88b165..78e2016 100755
--- a/flink-dist/src/main/flink-bin/bin/jobmanager.sh
+++ b/flink-dist/src/main/flink-bin/bin/jobmanager.sh
@@ -18,13 +18,19 @@
################################################################################
# Start/stop a Flink JobManager.
-USAGE="Usage: jobmanager.sh ((start|start-foreground) (local|cluster) [host] [webui-port])|stop|stop-all"
+USAGE="Usage: jobmanager.sh ((start|start-foreground) (local|cluster|flip6) [host] [webui-port])|stop|stop-all [flip6]"
STARTSTOP=$1
EXECUTIONMODE=$2
HOST=$3 # optional when starting multiple instances
WEBUIPORT=$4 # optional when starting multiple instances
+JOBMANAGER_TYPE=jobmanager
+
+if [[ "$EXECUTIONMODE" == "flip6" ]]; then
+ JOBMANAGER_TYPE=standalonesession
+fi
+
if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]] && [[ $STARTSTOP != "stop-all" ]]; then
echo $USAGE
exit 1
@@ -76,7 +82,7 @@ if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then
fi
if [[ $STARTSTOP == "start-foreground" ]]; then
- exec "${FLINK_BIN_DIR}"/flink-console.sh jobmanager "${args[@]}"
+ exec "${FLINK_BIN_DIR}"/flink-console.sh $JOBMANAGER_TYPE "${args[@]}"
else
- "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP jobmanager "${args[@]}"
+ "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $JOBMANAGER_TYPE "${args[@]}"
fi
http://git-wip-us.apache.org/repos/asf/flink/blob/fa16e9a8/flink-dist/src/main/flink-bin/bin/start-cluster.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/start-cluster.sh b/flink-dist/src/main/flink-bin/bin/start-cluster.sh
index b660f6b..465dab8 100755
--- a/flink-dist/src/main/flink-bin/bin/start-cluster.sh
+++ b/flink-dist/src/main/flink-bin/bin/start-cluster.sh
@@ -22,6 +22,13 @@ bin=`cd "$bin"; pwd`
. "$bin"/config.sh
+FLIP6=$1
+CLUSTER_TYPE=cluster
+
+if [[ "$FLIP6" == "flip6" ]]; then
+ CLUSTER_TYPE=flip6
+fi
+
# Start the JobManager instance(s)
shopt -s nocasematch
if [[ $HIGH_AVAILABILITY == "zookeeper" ]]; then
@@ -33,16 +40,16 @@ if [[ $HIGH_AVAILABILITY == "zookeeper" ]]; then
for ((i=0;i<${#MASTERS[@]};++i)); do
master=${MASTERS[i]}
webuiport=${WEBUIPORTS[i]}
- ssh -n $FLINK_SSH_OPTS $master -- "nohup /bin/bash -l \"${FLINK_BIN_DIR}/jobmanager.sh\" start cluster ${master} ${webuiport} &"
+ ssh -n $FLINK_SSH_OPTS $master -- "nohup /bin/bash -l \"${FLINK_BIN_DIR}/jobmanager.sh\" start $CLUSTER_TYPE ${master} ${webuiport} &"
done
else
echo "Starting cluster."
# Start single JobManager on this machine
- "$FLINK_BIN_DIR"/jobmanager.sh start cluster
+ "$FLINK_BIN_DIR"/jobmanager.sh start $CLUSTER_TYPE
fi
shopt -u nocasematch
# Start TaskManager instance(s)
-TMSlaves start
+TMSlaves start $FLIP6
http://git-wip-us.apache.org/repos/asf/flink/blob/fa16e9a8/flink-dist/src/main/flink-bin/bin/stop-cluster.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/stop-cluster.sh b/flink-dist/src/main/flink-bin/bin/stop-cluster.sh
index e04d2fa..cd243da 100755
--- a/flink-dist/src/main/flink-bin/bin/stop-cluster.sh
+++ b/flink-dist/src/main/flink-bin/bin/stop-cluster.sh
@@ -22,8 +22,10 @@ bin=`cd "$bin"; pwd`
. "$bin"/config.sh
+FLIP6=$1
+
# Stop TaskManager instance(s)
-TMSlaves stop
+TMSlaves stop $FLIP6
# Stop JobManager instance(s)
shopt -s nocasematch
@@ -32,10 +34,10 @@ if [[ $HIGH_AVAILABILITY == "zookeeper" ]]; then
readMasters
for master in ${MASTERS[@]}; do
- ssh -n $FLINK_SSH_OPTS $master -- "nohup /bin/bash -l \"${FLINK_BIN_DIR}/jobmanager.sh\" stop &"
+ ssh -n $FLINK_SSH_OPTS $master -- "nohup /bin/bash -l \"${FLINK_BIN_DIR}/jobmanager.sh\" stop \"${FLIP6}\" &"
done
else
- "$FLINK_BIN_DIR"/jobmanager.sh stop
+ "$FLINK_BIN_DIR"/jobmanager.sh stop "${FLIP6}"
fi
shopt -u nocasematch
http://git-wip-us.apache.org/repos/asf/flink/blob/fa16e9a8/flink-dist/src/main/flink-bin/bin/taskmanager.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/taskmanager.sh b/flink-dist/src/main/flink-bin/bin/taskmanager.sh
index 7509ac4..3cf74ca 100755
--- a/flink-dist/src/main/flink-bin/bin/taskmanager.sh
+++ b/flink-dist/src/main/flink-bin/bin/taskmanager.sh
@@ -18,7 +18,7 @@
################################################################################
# Start/stop a Flink TaskManager.
-USAGE="Usage: taskmanager.sh (start|start-foreground|stop|stop-all) (flip6)"
+USAGE="Usage: taskmanager.sh (start|start-foreground|stop|stop-all) [flip6]"
STARTSTOP=$1
TYPE=taskmanager