You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by yu...@apache.org on 2022/01/11 06:51:49 UTC

[incubator-pegasus] branch master updated: feat(online_migration): part4 - add script to update table ingest_behind (#867)

This is an automated email from the ASF dual-hosted git repository.

yuchenhe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new f687daf  feat(online_migration): part4 - add script to update table ingest_behind (#867)
f687daf is described below

commit f687daf15503667965345e931054c9c5316604e1
Author: HeYuchen <he...@xiaomi.com>
AuthorDate: Tue Jan 11 14:51:41 2022 +0800

    feat(online_migration): part4 - add script to update table ingest_behind (#867)
---
 scripts/pegasus_rebalance_cluster.sh    |   7 -
 scripts/pegasus_update_ingest_behind.sh | 337 ++++++++++++++++++++++++++++++++
 2 files changed, 337 insertions(+), 7 deletions(-)

diff --git a/scripts/pegasus_rebalance_cluster.sh b/scripts/pegasus_rebalance_cluster.sh
index 084c4d3..aae3892 100755
--- a/scripts/pegasus_rebalance_cluster.sh
+++ b/scripts/pegasus_rebalance_cluster.sh
@@ -49,13 +49,6 @@ pwd="$( cd "$( dirname "$0"  )" && pwd )"
 shell_dir="$( cd $pwd/.. && pwd )"
 cd $shell_dir
 
-source ./scripts/minos_common.sh
-find_cluster $cluster
-if [ $? -ne 0 ]; then
-  echo "ERROR: cluster \"$cluster\" not found"
-  exit 1
-fi
-
 echo "UID=$UID"
 echo "PID=$PID"
 echo "Start time: `date`"
diff --git a/scripts/pegasus_update_ingest_behind.sh b/scripts/pegasus_update_ingest_behind.sh
new file mode 100755
index 0000000..61c5382
--- /dev/null
+++ b/scripts/pegasus_update_ingest_behind.sh
@@ -0,0 +1,337 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# Update app ingestion_behind envs using minos.
+#
+
+PID=$$
+
+if [ $# -le 3 ]; then
+  echo "USAGE: $0 <cluster-name> <cluster-meta-list> <app_name> <ingestion_behind>"
+  echo "For example:"
+  echo "  $0 onebox 127.0.0.1:34601,127.0.0.1:34602 temp false"
+  echo
+  exit 1
+fi
+
+function find_app()
+{
+  meta_list=$1
+  app_name=$2
+  find_app="false"
+
+  echo ls | ./run.sh shell --cluster $meta_list &>/tmp/$UID.$PID.pegasus.update_ingestion_behind.ls
+  while read app_line
+  do
+    status=`echo ${app_line} | awk '{print $2}'`
+    name=`echo ${app_line} | awk '{print $3}'`
+    if [ "${app_name}" != "$name" ]; then
+        continue
+    fi
+    if [ "$status" == "AVAILABLE" ]; then
+        find_app="true"
+        break
+    fi
+  done </tmp/$UID.$PID.pegasus.update_ingestion_behind.ls
+  echo "$find_app"
+}
+
+
+function get_app_node_count()
+{
+  meta_list=$1
+  app_name=$2
+  node=$3
+  primary=$4
+  
+  pri_count=0
+  total_count=0
+
+  echo "app $app_name -d" | ./run.sh shell --cluster $meta_list &>/tmp/$UID.$PID.pegasus.update_ingestion_behind.$app
+  while read line
+  do
+    node_addr=`echo $line | awk '{print $1}'`
+    if [ "$node_addr" = "$node" ]; then
+      pri_count=`echo $line | awk '{print $2}'`
+      total_count=`echo $line | awk '{print $4}'`
+      break
+    fi
+  done </tmp/$UID.$PID.pegasus.update_ingestion_behind.$app
+
+  if [ "${primary}" == "true" ]; then
+    echo "$pri_count"
+  else
+    echo "$total_count"
+  fi
+}
+
+cluster=$1
+meta_list=$2
+app_name=$3
+ingestion_behind=$4
+if [ "$ingestion_behind" != "true" -a "$ingestion_behind" != "false" ]; then
+  echo "ERROR: invalid ingestion_behind, should be true or false"
+  exit 1
+fi
+
+start_task_id=0
+rebalance_cluster_after_rolling=true
+rebalance_only_move_primary=false
+
+echo "UID=$UID"
+echo "PID=$PID"
+echo "Start time: `date`"
+total_start_time=$((`date +%s`))
+echo
+
+echo "Generating /tmp/$UID.$PID.pegasus.update_ingestion_behind.cluster_info..."
+echo cluster_info | ./run.sh shell --cluster $meta_list 2>&1 | sed 's/ *$//' >/tmp/$UID.$PID.pegasus.update_ingestion_behind.cluster_info
+cname=`grep zookeeper_root /tmp/$UID.$PID.pegasus.update_ingestion_behind.cluster_info | grep -o '/[^/]*$' | grep -o '[^/]*$'`
+if [ "$cname" != "$cluster" ]; then
+  echo "ERROR: cluster name and meta list not matched"
+  exit 1
+fi
+
+app_found=`find_app ${meta_list} ${app_name}`
+if [ "$app_found" != "true" ]; then
+  echo "ERROR: can't find app $app_name in cluster $cluster"
+  exit 1
+fi
+
+pmeta=`grep primary_meta_server /tmp/$UID.$PID.pegasus.update_ingestion_behind.cluster_info | grep -o '[0-9.:]*$'`
+if [ "$pmeta" == "" ]; then
+  echo "ERROR: extract primary_meta_server by shell failed"
+  exit 1
+fi
+
+rs_list_file="/tmp/$UID.$PID.pegasus.update_ingestion_behind.rs.list"
+echo "Generating $rs_list_file..."
+echo nodes | ./run.sh shell --cluster $meta_list &>/tmp/$UID.$PID.pegasus.update_ingestion_behind.nodes
+while read line
+do
+  node_status=`echo $line | awk '{print $2}'`
+  if [ "$node_status" != "ALIVE" ]; then
+    continue
+  else
+    echo $line | awk '{print $1}' >>$rs_list_file
+  fi
+done </tmp/$UID.$PID.pegasus.update_ingestion_behind.nodes
+
+echo "Set meta level to steady..."
+echo "set_meta_level steady" | ./run.sh shell --cluster $meta_list &>/tmp/$UID.$PID.pegasus.update_ingestion_behind.set_meta_level
+set_ok=`grep 'control meta level ok' /tmp/$UID.$PID.pegasus.update_ingestion_behind.set_meta_level | wc -l`
+if [ $set_ok -ne 1 ]; then
+  echo "ERROR: set meta level to steady failed"
+  exit 1
+fi
+
+echo "Set app envs rocksdb.allow_ingest_behind=${ingestion_behind}"
+log_file="/tmp/$UID.$PID.pegasus.set_app_envs.${app_name}"
+echo -e "use ${app_name}\n set_app_envs rocksdb.allow_ingest_behind ${ingestion_behind}" | ./run.sh shell --cluster $meta_list &>${log_file}
+set_fail=`grep 'set app env failed' ${log_file} | wc -l`
+if [ ${set_fail} -eq 1 ]; then
+    echo "ERROR: set app envs failed, refer to ${log_file}"
+    exit 1
+fi
+echo "Sleep 30 seconds to wait app envs working..."
+sleep 30
+
+echo "Set lb.assign_delay_ms to 30min..."
+echo "remote_command -l $pmeta meta.lb.assign_delay_ms 180000000" | ./run.sh shell --cluster $meta_list &>/tmp/$UID.$PID.pegasus.rolling_node.assign_delay_ms
+set_ok=`grep OK /tmp/$UID.$PID.pegasus.rolling_node.assign_delay_ms | wc -l`
+if [ $set_ok -ne 1 ]; then
+  echo "ERROR: set lb.assign_delay_ms to 30min failed"
+  exit 1
+fi
+
+echo
+while read line
+do
+  start_time=$((`date +%s`))
+  node=$line
+  node_ip=`echo ${line} | cut -d ':' -f1`
+  node_name=`getent hosts $node_ip | awk '{print $2}'`
+
+  echo "=================================================================="
+  echo "=================================================================="
+  echo "Closing partitions of [$app_name] on [$node_name] [$node]..."
+  echo
+
+  echo "Getting serving replica count..."
+  serving_replica_count=`echo 'nodes -d' | ./run.sh shell --cluster $meta_list | grep $node | tail -n 1 | awk '{print $3}'`
+  app_serving_replica_count=`get_app_node_count ${meta_list} ${app_name} ${node} 'false'`
+  echo "serving_replica_count=$serving_replica_count"
+  echo "app_serving_replica_count=$app_serving_replica_count"
+  echo
+
+  echo "Set lb.add_secondary_max_count_for_one_node to 0..."
+  echo "remote_command -l $pmeta meta.lb.add_secondary_max_count_for_one_node 0" | ./run.sh shell --cluster $meta_list &>/tmp/$UID.$PID.pegasus.update_ingestion_behind.add_secondary_max_count_for_one_node
+  set_ok=`grep OK /tmp/$UID.$PID.pegasus.update_ingestion_behind.add_secondary_max_count_for_one_node | wc -l`
+  if [ $set_ok -ne 1 ]; then
+    echo "ERROR: set lb.add_secondary_max_count_for_one_node to 0 failed"
+    exit 1
+  fi
+  echo
+
+  echo "Migrating primary replicas out of node..."
+  sleeped=0
+  while true
+  do
+    if [ $((sleeped%10)) -eq 0 ]; then
+      ./run.sh migrate_node -c $meta_list -n $node -a $app_name -t run &>/tmp/$UID.$PID.pegasus.update_ingestion_behind.migrate_node
+      echo "Send migrate propose, refer to /tmp/$UID.$PID.pegasus.update_ingestion_behind.migrate_node for details"
+    fi
+    pri_count=`get_app_node_count ${meta_list} ${app_name} ${node} 'true'`
+    if [ $pri_count -eq 0 ]; then
+      echo "Migrate done."
+      break
+    elif [ $sleeped -gt 28 ]; then
+      echo "Migrate timeout."
+      break
+    else
+      echo "Still $pri_count primary replicas left on $node"
+      sleep 1
+      sleeped=$((sleeped+1))
+    fi
+  done
+  echo
+  sleep 1
+
+  echo "Downgrading replicas on node..."
+  sleeped=0
+  while true
+  do
+    if [ $((sleeped%50)) -eq 0 ]; then
+      ./run.sh downgrade_node -c $meta_list -n $node -a $app_name -t run &>/tmp/$UID.$PID.pegasus.update_ingestion_behind.downgrade_node
+      echo "Send downgrade propose, refer to /tmp/$UID.$PID.pegasus.update_ingestion_behind.downgrade_node for details"
+    fi
+    rep_count=`get_app_node_count ${meta_list} ${app_name} ${node} 'false'`
+    if [ $rep_count -eq 0 ]; then
+      echo "Downgrade done."
+      break
+    elif [ $sleeped -gt 28 ]; then
+      echo "Downgrade timeout."
+      break
+    else
+      echo "Still $rep_count replicas left on $node"
+      sleep 1
+      sleeped=$((sleeped+1))
+    fi
+  done
+  echo
+  sleep 1
+
+  echo "Checking replicas closed on node..."
+  sleeped=0
+  while true
+  do
+    if [ $((sleeped%50)) -eq 0 ]; then
+      echo "Send kill_partition commands to node..."
+      grep '^propose ' /tmp/$UID.$PID.pegasus.update_ingestion_behind.downgrade_node >/tmp/$UID.$PID.pegasus.update_ingestion_behind.downgrade_node.propose
+      while read line2
+      do
+        gpid=`echo $line2 | awk '{print $3}' | sed 's/\./ /'`
+        echo "remote_command -l $node replica.kill_partition $gpid" | ./run.sh shell --cluster $meta_list &>/tmp/$UID.$PID.pegasus.update_ingestion_behind.kill_partition
+      done </tmp/$UID.$PID.pegasus.update_ingestion_behind.downgrade_node.propose
+      echo "Sent to `cat /tmp/$UID.$PID.pegasus.update_ingestion_behind.downgrade_node.propose | wc -l` partitions."
+    fi
+    echo "remote_command -l $node perf-counters '.*replica(Count)'" | ./run.sh shell --cluster $meta_list &>/tmp/$UID.$PID.pegasus.update_ingestion_behind.replica_count_perf_counters
+    serving_count=`grep -o 'replica_stub.replica(Count)","type":"NUMBER","value":[0-9]*' /tmp/$UID.$PID.pegasus.update_ingestion_behind.replica_count_perf_counters | grep -o '[0-9]*$'`
+    opening_count=`grep -o 'replica_stub.opening.replica(Count)","type":"NUMBER","value":[0-9]*' /tmp/$UID.$PID.pegasus.update_ingestion_behind.replica_count_perf_counters | grep -o '[0-9]*$'`
+    closing_count=`grep -o 'replica_stub.closing.replica(Count)","type":"NUMBER","value":[0-9]*' /tmp/$UID.$PID.pegasus.update_ingestion_behind.replica_count_perf_counters | grep -o '[0-9]*$'`
+    if [ "$serving_count" = "" -o "$opening_count" = "" -o "$closing_count" = "" ]; then
+      echo "ERROR: extract replica count from perf counters failed"
+      exit 1
+    fi
+    rep_count=$((serving_count + opening_count + closing_count + app_serving_replica_count - serving_replica_count))
+    if [ $rep_count -eq 0 ]; then
+      echo "Close done."
+      break
+    elif [ $sleeped -gt 28 ]; then
+      echo "Close timeout."
+      break
+    else
+      echo "Still $rep_count replicas not closed on $node"
+      sleep 1
+      sleeped=$((sleeped+1))
+    fi
+  done
+  echo
+  sleep 1
+
+  echo "remote_command -l $node flush-log" | ./run.sh shell --cluster $meta_list &>/dev/null
+
+  echo "Set lb.add_secondary_max_count_for_one_node to 100..."
+  echo "remote_command -l $pmeta meta.lb.add_secondary_max_count_for_one_node 100" | ./run.sh shell --cluster $meta_list &>/tmp/$UID.$PID.pegasus.update_ingestion_behind.add_secondary_max_count_for_one_node
+  set_ok=`grep OK /tmp/$UID.$PID.pegasus.update_ingestion_behind.add_secondary_max_count_for_one_node | wc -l`
+  if [ $set_ok -ne 1 ]; then
+    echo "ERROR: set lb.add_secondary_max_count_for_one_node to 100 failed"
+    exit 1
+  fi
+
+  echo "Wait cluster to become healthy..."
+  while true
+  do
+    unhealthy_count=`echo "ls -d" | ./run.sh shell --cluster $meta_list | awk 'f{ if(NF<7){f=0} else if($3!=$4){print} } / fully_healthy /{f=1}' | wc -l`
+    if [ $unhealthy_count -eq 0 ]; then
+      echo "Cluster becomes healthy."
+      break
+    else
+      sleep 1
+    fi
+  done
+  echo
+  sleep 1
+
+  finish_time=$((`date +%s`))
+  echo "Updating ingestion_behind=$ingestion_behind for all partitions of app [$app_name] on [$node_name] [$node] done."
+  echo "Elapsed time is $((finish_time - start_time)) seconds."
+  echo
+done <$rs_list_file
+
+echo "=================================================================="
+echo "=================================================================="
+
+echo "Set lb.add_secondary_max_count_for_one_node to DEFAULT..."
+echo "remote_command -l $pmeta meta.lb.add_secondary_max_count_for_one_node DEFAULT" | ./run.sh shell --cluster $meta_list &>/tmp/$UID.$PID.pegasus.update_ingestion_behind.add_secondary_max_count_for_one_node
+set_ok=`grep OK /tmp/$UID.$PID.pegasus.update_ingestion_behind.add_secondary_max_count_for_one_node | wc -l`
+if [ $set_ok -ne 1 ]; then
+  echo "ERROR: set lb.add_secondary_max_count_for_one_node to DEFAULT failed"
+  exit 1
+fi
+
+echo "Set lb.assign_delay_ms to DEFAULT..."
+echo "remote_command -l $pmeta meta.lb.assign_delay_ms DEFAULT" | ./run.sh shell --cluster $meta_list &>/tmp/$UID.$PID.pegasus.rolling_node.assign_delay_ms
+set_ok=`grep OK /tmp/$UID.$PID.pegasus.rolling_node.assign_delay_ms | wc -l`
+if [ $set_ok -ne 1 ]; then
+  echo "ERROR: set lb.assign_delay_ms to DEFAULT failed"
+  exit 1
+fi
+echo
+
+if [ "$rebalance_cluster_after_rolling" == "true" ]; then
+  echo "Start to rebalance cluster..."
+  ./scripts/pegasus_rebalance_cluster.sh $cluster $meta_list $rebalance_only_move_primary
+fi
+
+echo "Finish time: `date`"
+total_finish_time=$((`date +%s`))
+echo "Update $app_name ingestion_behind=$ingestion_behind, elasped time is $((total_finish_time - total_start_time)) seconds."
+
+rm -f /tmp/$UID.$PID.pegasus.* &>/dev/null
+

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org