You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/08/15 17:45:21 UTC

[GitHub] [beam] chamikaramj commented on a change in pull request #15307: [BEAM-12735] Adding Python XLang examples to the RC validation script

chamikaramj commented on a change in pull request #15307:
URL: https://github.com/apache/beam/pull/15307#discussion_r689121194



##########
File path: release/src/main/scripts/run_rc_validation.sh
##########
@@ -583,3 +601,148 @@ if [[ ("$python_leaderboard_direct" = true \
 else
   echo "* Skip Python Leaderboard & GameStates Validations"
 fi
+
+echo ""
+echo "====================Starting Python Cross-language Validations==============="
+if [[ ("$python_xlang_kafka_taxi_dataflow" = true
+      || "$python_xlang_sql_taxi_dataflow" = true) \
+      && ! -z `which gnome-terminal` && ! -z `which kubectl` ]]; then
+  cd ${LOCAL_BEAM_DIR}
+
+  echo "---------------------Downloading Python Staging RC----------------------------"
+  wget ${PYTHON_RC_DOWNLOAD_URL}/${RELEASE_VER}/python/apache-beam-${RELEASE_VER}.zip
+  wget ${PYTHON_RC_DOWNLOAD_URL}/${RELEASE_VER}/python/apache-beam-${RELEASE_VER}.zip.sha512
+  if [[ ! -f apache-beam-${RELEASE_VER}.zip ]]; then
+    { echo "Fail to download Python Staging RC files." ;exit 1; }
+  fi
+
+  echo "--------------------------Verifying Hashes------------------------------------"
+  sha512sum -c apache-beam-${RELEASE_VER}.zip.sha512
+
+  `which pip` install --upgrade pip
+  `which pip` install --upgrade setuptools
+  `which pip` install --upgrade virtualenv
+
+  echo "-----------------------Setting up Shell Env Vars------------------------------"
+  set_bashrc
+
+  echo "-----------------------Setting up Kafka Cluster on GKE------------------------"
+  CLUSTER_NAME=xlang-kafka-cluster-$RANDOM
+  if [[ "$python_xlang_kafka_taxi_dataflow" = true ]]; then
+    gcloud container clusters create --project=${USER_GCP_PROJECT} --region=${USER_GCP_REGION} --no-enable-ip-alias $CLUSTER_NAME
+    kubectl apply -R -f ${LOCAL_BEAM_DIR}/.test-infra/kubernetes/kafka-cluster
+    echo "* Please wait for 5 mins to let a Kafka cluster be launched on GKE."
+    echo "* Sleeping for 5 mins"
+    sleep 5m
+  else
+    echo "* Skip Kafka cluster setup"
+  fi
+
+  echo "-----------------------Building expansion service jar------------------------"
+  ./gradlew sdks:java:io:expansion-service:shadowJar
+  ./gradlew sdks:java:extensions:sql:expansion-service:shadowJar
+
+  # Run Python XLang pipelines under multiple versions of Python
+  cd ${LOCAL_BEAM_DIR}
+  for py_version in "${PYTHON_VERSIONS_TO_VALIDATE[@]}"
+  do
+    rm -rf ./beam_env_${py_version}
+    echo "--------------Setting up virtualenv with $py_version interpreter----------------"
+    virtualenv beam_env_${py_version} -p $py_version
+    . beam_env_${py_version}/bin/activate
+    ln -s ${LOCAL_BEAM_DIR}/sdks beam_env_${py_version}/lib/sdks
+
+    echo "--------------------------Installing Python SDK-------------------------------"
+    pip install apache-beam-${RELEASE_VER}.zip[gcp]
+
+    echo "----------------Starting XLang Kafka Taxi with DataflowRunner---------------------"
+    if [[ "$python_xlang_kafka_taxi_dataflow" = true ]]; then
+      BOOTSTRAP_SERVERS="$(kubectl get svc outside-0 -o jsonpath='{.status.loadBalancer.ingress[0].ip}'):32400"
+      echo "BOOTSTRAP_SERVERS=${BOOTSTRAP_SERVERS}"
+      KAFKA_TAXI_DF_DATASET=${USER}_python_validations_$(date +%m%d)_$RANDOM
+      bq mk --project_id=${USER_GCP_PROJECT} ${KAFKA_TAXI_DF_DATASET}
+      echo "export BOOTSTRAP_SERVERS=${BOOTSTRAP_SERVERS}" >> ~/.bashrc
+      echo "export KAFKA_TAXI_DF_DATASET=${KAFKA_TAXI_DF_DATASET}" >> ~/.bashrc
+
+      echo "This is a streaming job. This task will be launched in a separate terminal."
+      gnome-terminal -x sh -c \
+      "echo '*****************************************************';
+       echo '* Running Python XLang Kafka Taxi with DataflowRunner';
+       echo '*****************************************************';
+      . ${LOCAL_BEAM_DIR}/beam_env_${py_version}/bin/activate
+      python -m apache_beam.examples.kafkataxi.kafka_taxi \
+      --project=${USER_GCP_PROJECT} \
+      --region=${USER_GCP_REGION} \
+      --topic beam-runnerv2 \
+      --bootstrap_servers ${BOOTSTRAP_SERVERS} \
+      --bq_dataset ${KAFKA_TAXI_DF_DATASET} \
+      --runner DataflowRunner \
+      --num_workers 5 \
+      --temp_location=${USER_GCS_BUCKET}/temp/ \
+      --experiments=use_runner_v2 \
+      --with_metadata \
+      --sdk_location apache-beam-${RELEASE_VER}.zip; \
+      exec bash"
+
+      echo "***************************************************************"
+      echo "* Please wait for at least 10 mins to let Dataflow job be launched and results get populated."
+      echo "* Sleeping for 10 mins"
+      sleep 10m
+      echo "* How to verify results:"
+      echo "* 1. Goto your Dataflow job console and check whether there is any error."
+      echo "* 2. Check whether ${KAFKA_TAXI_DF_DATASET}.xlang_kafka_taxi has data, retrieving BigQuery data as below: "
+      bq head -n 10 ${KAFKA_TAXI_DF_DATASET}.xlang_kafka_taxi
+      echo "***************************************************************"
+    else
+      echo "* Skip Python XLang Kafka Taxi with DataflowRunner"
+    fi
+
+    echo "----------------Starting XLang SQL Taxi with DataflowRunner---------------------"
+    if [[ "$python_xlang_sql_taxi_dataflow" = true ]]; then
+      SQL_TAXI_TOPIC=${USER}_python_validations_$(date +%m%d)_$RANDOM
+      SQL_TAXI_SUBSCRIPTION=${USER}_python_validations_$(date +%m%d)_$RANDOM
+      gcloud pubsub topics create --project=${USER_GCP_PROJECT} ${SQL_TAXI_TOPIC}
+      gcloud pubsub subscriptions create --project=${USER_GCP_PROJECT} --topic=${SQL_TAXI_TOPIC} ${SQL_TAXI_SUBSCRIPTION}
+      echo "export SQL_TAXI_TOPIC=${SQL_TAXI_TOPIC}" >> ~/.bashrc
+
+      echo "This is a streaming job. This task will be launched in a separate terminal."
+      gnome-terminal -x sh -c \
+      "echo '***************************************************';
+       echo '* Running Python XLang SQL Taxi with DataflowRunner';
+       echo '***************************************************';
+      . ${LOCAL_BEAM_DIR}/beam_env_${py_version}/bin/activate
+      python -m apache_beam.examples.sql_taxi \
+      --project=${USER_GCP_PROJECT} \
+      --region=${USER_GCP_REGION} \
+      --runner DataflowRunner \
+      --num_workers 5 \
+      --temp_location=${USER_GCS_BUCKET}/temp/ \
+      --experiments=use_runner_v2 \

Review comment:
       Ditto regarding experiment.

##########
File path: release/src/main/scripts/run_rc_validation.sh
##########
@@ -583,3 +601,148 @@ if [[ ("$python_leaderboard_direct" = true \
 else
   echo "* Skip Python Leaderboard & GameStates Validations"
 fi
+
+echo ""
+echo "====================Starting Python Cross-language Validations==============="
+if [[ ("$python_xlang_kafka_taxi_dataflow" = true
+      || "$python_xlang_sql_taxi_dataflow" = true) \
+      && ! -z `which gnome-terminal` && ! -z `which kubectl` ]]; then
+  cd ${LOCAL_BEAM_DIR}
+
+  echo "---------------------Downloading Python Staging RC----------------------------"
+  wget ${PYTHON_RC_DOWNLOAD_URL}/${RELEASE_VER}/python/apache-beam-${RELEASE_VER}.zip
+  wget ${PYTHON_RC_DOWNLOAD_URL}/${RELEASE_VER}/python/apache-beam-${RELEASE_VER}.zip.sha512
+  if [[ ! -f apache-beam-${RELEASE_VER}.zip ]]; then
+    { echo "Fail to download Python Staging RC files." ;exit 1; }
+  fi
+
+  echo "--------------------------Verifying Hashes------------------------------------"
+  sha512sum -c apache-beam-${RELEASE_VER}.zip.sha512
+
+  `which pip` install --upgrade pip
+  `which pip` install --upgrade setuptools
+  `which pip` install --upgrade virtualenv
+
+  echo "-----------------------Setting up Shell Env Vars------------------------------"
+  set_bashrc
+
+  echo "-----------------------Setting up Kafka Cluster on GKE------------------------"
+  CLUSTER_NAME=xlang-kafka-cluster-$RANDOM
+  if [[ "$python_xlang_kafka_taxi_dataflow" = true ]]; then
+    gcloud container clusters create --project=${USER_GCP_PROJECT} --region=${USER_GCP_REGION} --no-enable-ip-alias $CLUSTER_NAME
+    kubectl apply -R -f ${LOCAL_BEAM_DIR}/.test-infra/kubernetes/kafka-cluster
+    echo "* Please wait for 5 mins to let a Kafka cluster be launched on GKE."
+    echo "* Sleeping for 5 mins"
+    sleep 5m
+  else
+    echo "* Skip Kafka cluster setup"
+  fi
+
+  echo "-----------------------Building expansion service jar------------------------"
+  ./gradlew sdks:java:io:expansion-service:shadowJar
+  ./gradlew sdks:java:extensions:sql:expansion-service:shadowJar
+
+  # Run Python XLang pipelines under multiple versions of Python
+  cd ${LOCAL_BEAM_DIR}
+  for py_version in "${PYTHON_VERSIONS_TO_VALIDATE[@]}"
+  do
+    rm -rf ./beam_env_${py_version}
+    echo "--------------Setting up virtualenv with $py_version interpreter----------------"
+    virtualenv beam_env_${py_version} -p $py_version
+    . beam_env_${py_version}/bin/activate
+    ln -s ${LOCAL_BEAM_DIR}/sdks beam_env_${py_version}/lib/sdks
+
+    echo "--------------------------Installing Python SDK-------------------------------"
+    pip install apache-beam-${RELEASE_VER}.zip[gcp]
+
+    echo "----------------Starting XLang Kafka Taxi with DataflowRunner---------------------"
+    if [[ "$python_xlang_kafka_taxi_dataflow" = true ]]; then
+      BOOTSTRAP_SERVERS="$(kubectl get svc outside-0 -o jsonpath='{.status.loadBalancer.ingress[0].ip}'):32400"
+      echo "BOOTSTRAP_SERVERS=${BOOTSTRAP_SERVERS}"
+      KAFKA_TAXI_DF_DATASET=${USER}_python_validations_$(date +%m%d)_$RANDOM
+      bq mk --project_id=${USER_GCP_PROJECT} ${KAFKA_TAXI_DF_DATASET}
+      echo "export BOOTSTRAP_SERVERS=${BOOTSTRAP_SERVERS}" >> ~/.bashrc
+      echo "export KAFKA_TAXI_DF_DATASET=${KAFKA_TAXI_DF_DATASET}" >> ~/.bashrc
+
+      echo "This is a streaming job. This task will be launched in a separate terminal."
+      gnome-terminal -x sh -c \
+      "echo '*****************************************************';
+       echo '* Running Python XLang Kafka Taxi with DataflowRunner';
+       echo '*****************************************************';
+      . ${LOCAL_BEAM_DIR}/beam_env_${py_version}/bin/activate
+      python -m apache_beam.examples.kafkataxi.kafka_taxi \
+      --project=${USER_GCP_PROJECT} \
+      --region=${USER_GCP_REGION} \
+      --topic beam-runnerv2 \
+      --bootstrap_servers ${BOOTSTRAP_SERVERS} \
+      --bq_dataset ${KAFKA_TAXI_DF_DATASET} \
+      --runner DataflowRunner \
+      --num_workers 5 \
+      --temp_location=${USER_GCS_BUCKET}/temp/ \
+      --experiments=use_runner_v2 \

Review comment:
       You should not need to manually specify this experiment for Beam 2.32.0 and later.

##########
File path: release/src/main/scripts/run_rc_validation.sh
##########
@@ -168,8 +184,7 @@ bq version
 echo "-----------------Checking gnome-terminal-----------------"
 if [[ -z `which gnome-terminal` ]]; then
   echo "You don't have gnome-terminal installed."
-  if [[ "$INSTALL_GNOME_TERMINAL" != true ]]; then
-    sudo apt-get upgrade
+  if [[ "$INSTALL_GNOME_TERMINAL" = true ]]; then

Review comment:
       The condition here was inverted. Did we have a bug before ?

##########
File path: release/src/main/scripts/run_rc_validation.sh
##########
@@ -178,6 +193,18 @@ if [[ -z `which gnome-terminal` ]]; then
 fi
 gnome-terminal --version
 
+echo "-----------------Checking kubectl-----------------"
+if [[ -z `which kubectl` ]]; then
+  echo "You don't have kubectl installed."
+  if [[ "$INSTALL_KUBECTL" = true ]]; then
+    sudo apt-get install kubectl
+  else
+    echo "kubectl is not installed. Validation on Python cross-language Kafka taxi will be skipped."

Review comment:
       Should this be a failure for validation instead of skipping ?

##########
File path: release/src/main/scripts/run_rc_validation.sh
##########
@@ -583,3 +601,148 @@ if [[ ("$python_leaderboard_direct" = true \
 else
   echo "* Skip Python Leaderboard & GameStates Validations"
 fi
+
+echo ""
+echo "====================Starting Python Cross-language Validations==============="
+if [[ ("$python_xlang_kafka_taxi_dataflow" = true
+      || "$python_xlang_sql_taxi_dataflow" = true) \
+      && ! -z `which gnome-terminal` && ! -z `which kubectl` ]]; then
+  cd ${LOCAL_BEAM_DIR}
+
+  echo "---------------------Downloading Python Staging RC----------------------------"
+  wget ${PYTHON_RC_DOWNLOAD_URL}/${RELEASE_VER}/python/apache-beam-${RELEASE_VER}.zip
+  wget ${PYTHON_RC_DOWNLOAD_URL}/${RELEASE_VER}/python/apache-beam-${RELEASE_VER}.zip.sha512
+  if [[ ! -f apache-beam-${RELEASE_VER}.zip ]]; then
+    { echo "Fail to download Python Staging RC files." ;exit 1; }
+  fi
+
+  echo "--------------------------Verifying Hashes------------------------------------"
+  sha512sum -c apache-beam-${RELEASE_VER}.zip.sha512
+
+  `which pip` install --upgrade pip
+  `which pip` install --upgrade setuptools
+  `which pip` install --upgrade virtualenv
+
+  echo "-----------------------Setting up Shell Env Vars------------------------------"
+  set_bashrc
+
+  echo "-----------------------Setting up Kafka Cluster on GKE------------------------"
+  CLUSTER_NAME=xlang-kafka-cluster-$RANDOM
+  if [[ "$python_xlang_kafka_taxi_dataflow" = true ]]; then
+    gcloud container clusters create --project=${USER_GCP_PROJECT} --region=${USER_GCP_REGION} --no-enable-ip-alias $CLUSTER_NAME
+    kubectl apply -R -f ${LOCAL_BEAM_DIR}/.test-infra/kubernetes/kafka-cluster
+    echo "* Please wait for 5 mins to let a Kafka cluster be launched on GKE."
+    echo "* Sleeping for 5 mins"
+    sleep 5m
+  else
+    echo "* Skip Kafka cluster setup"
+  fi
+
+  echo "-----------------------Building expansion service jar------------------------"
+  ./gradlew sdks:java:io:expansion-service:shadowJar
+  ./gradlew sdks:java:extensions:sql:expansion-service:shadowJar
+
+  # Run Python XLang pipelines under multiple versions of Python
+  cd ${LOCAL_BEAM_DIR}
+  for py_version in "${PYTHON_VERSIONS_TO_VALIDATE[@]}"
+  do
+    rm -rf ./beam_env_${py_version}
+    echo "--------------Setting up virtualenv with $py_version interpreter----------------"
+    virtualenv beam_env_${py_version} -p $py_version
+    . beam_env_${py_version}/bin/activate
+    ln -s ${LOCAL_BEAM_DIR}/sdks beam_env_${py_version}/lib/sdks
+
+    echo "--------------------------Installing Python SDK-------------------------------"
+    pip install apache-beam-${RELEASE_VER}.zip[gcp]
+
+    echo "----------------Starting XLang Kafka Taxi with DataflowRunner---------------------"
+    if [[ "$python_xlang_kafka_taxi_dataflow" = true ]]; then
+      BOOTSTRAP_SERVERS="$(kubectl get svc outside-0 -o jsonpath='{.status.loadBalancer.ingress[0].ip}'):32400"
+      echo "BOOTSTRAP_SERVERS=${BOOTSTRAP_SERVERS}"
+      KAFKA_TAXI_DF_DATASET=${USER}_python_validations_$(date +%m%d)_$RANDOM
+      bq mk --project_id=${USER_GCP_PROJECT} ${KAFKA_TAXI_DF_DATASET}
+      echo "export BOOTSTRAP_SERVERS=${BOOTSTRAP_SERVERS}" >> ~/.bashrc
+      echo "export KAFKA_TAXI_DF_DATASET=${KAFKA_TAXI_DF_DATASET}" >> ~/.bashrc
+
+      echo "This is a streaming job. This task will be launched in a separate terminal."
+      gnome-terminal -x sh -c \
+      "echo '*****************************************************';
+       echo '* Running Python XLang Kafka Taxi with DataflowRunner';
+       echo '*****************************************************';
+      . ${LOCAL_BEAM_DIR}/beam_env_${py_version}/bin/activate
+      python -m apache_beam.examples.kafkataxi.kafka_taxi \
+      --project=${USER_GCP_PROJECT} \
+      --region=${USER_GCP_REGION} \
+      --topic beam-runnerv2 \
+      --bootstrap_servers ${BOOTSTRAP_SERVERS} \
+      --bq_dataset ${KAFKA_TAXI_DF_DATASET} \
+      --runner DataflowRunner \
+      --num_workers 5 \
+      --temp_location=${USER_GCS_BUCKET}/temp/ \
+      --experiments=use_runner_v2 \
+      --with_metadata \
+      --sdk_location apache-beam-${RELEASE_VER}.zip; \
+      exec bash"
+
+      echo "***************************************************************"
+      echo "* Please wait for at least 10 mins to let Dataflow job be launched and results get populated."
+      echo "* Sleeping for 10 mins"
+      sleep 10m
+      echo "* How to verify results:"
+      echo "* 1. Goto your Dataflow job console and check whether there is any error."
+      echo "* 2. Check whether ${KAFKA_TAXI_DF_DATASET}.xlang_kafka_taxi has data, retrieving BigQuery data as below: "

Review comment:
       Will it be possible to  run a  'grep' to confirm that the output is not empty ?

##########
File path: release/src/main/scripts/run_rc_validation.sh
##########
@@ -583,3 +601,148 @@ if [[ ("$python_leaderboard_direct" = true \
 else
   echo "* Skip Python Leaderboard & GameStates Validations"
 fi
+
+echo ""
+echo "====================Starting Python Cross-language Validations==============="
+if [[ ("$python_xlang_kafka_taxi_dataflow" = true
+      || "$python_xlang_sql_taxi_dataflow" = true) \
+      && ! -z `which gnome-terminal` && ! -z `which kubectl` ]]; then
+  cd ${LOCAL_BEAM_DIR}
+
+  echo "---------------------Downloading Python Staging RC----------------------------"
+  wget ${PYTHON_RC_DOWNLOAD_URL}/${RELEASE_VER}/python/apache-beam-${RELEASE_VER}.zip
+  wget ${PYTHON_RC_DOWNLOAD_URL}/${RELEASE_VER}/python/apache-beam-${RELEASE_VER}.zip.sha512
+  if [[ ! -f apache-beam-${RELEASE_VER}.zip ]]; then
+    { echo "Fail to download Python Staging RC files." ;exit 1; }
+  fi
+
+  echo "--------------------------Verifying Hashes------------------------------------"
+  sha512sum -c apache-beam-${RELEASE_VER}.zip.sha512
+
+  `which pip` install --upgrade pip
+  `which pip` install --upgrade setuptools
+  `which pip` install --upgrade virtualenv
+
+  echo "-----------------------Setting up Shell Env Vars------------------------------"
+  set_bashrc
+
+  echo "-----------------------Setting up Kafka Cluster on GKE------------------------"
+  CLUSTER_NAME=xlang-kafka-cluster-$RANDOM
+  if [[ "$python_xlang_kafka_taxi_dataflow" = true ]]; then
+    gcloud container clusters create --project=${USER_GCP_PROJECT} --region=${USER_GCP_REGION} --no-enable-ip-alias $CLUSTER_NAME
+    kubectl apply -R -f ${LOCAL_BEAM_DIR}/.test-infra/kubernetes/kafka-cluster

Review comment:
       Did you confirm that works. I think we Beam Kafka IT is currently failing due to a port issue when starting up this cluster: https://issues.apache.org/jira/browse/BEAM-9482

##########
File path: release/src/main/scripts/run_rc_validation.sh
##########
@@ -583,3 +601,148 @@ if [[ ("$python_leaderboard_direct" = true \
 else
   echo "* Skip Python Leaderboard & GameStates Validations"
 fi
+
+echo ""
+echo "====================Starting Python Cross-language Validations==============="
+if [[ ("$python_xlang_kafka_taxi_dataflow" = true
+      || "$python_xlang_sql_taxi_dataflow" = true) \
+      && ! -z `which gnome-terminal` && ! -z `which kubectl` ]]; then
+  cd ${LOCAL_BEAM_DIR}
+
+  echo "---------------------Downloading Python Staging RC----------------------------"
+  wget ${PYTHON_RC_DOWNLOAD_URL}/${RELEASE_VER}/python/apache-beam-${RELEASE_VER}.zip
+  wget ${PYTHON_RC_DOWNLOAD_URL}/${RELEASE_VER}/python/apache-beam-${RELEASE_VER}.zip.sha512
+  if [[ ! -f apache-beam-${RELEASE_VER}.zip ]]; then
+    { echo "Fail to download Python Staging RC files." ;exit 1; }
+  fi
+
+  echo "--------------------------Verifying Hashes------------------------------------"
+  sha512sum -c apache-beam-${RELEASE_VER}.zip.sha512
+
+  `which pip` install --upgrade pip
+  `which pip` install --upgrade setuptools
+  `which pip` install --upgrade virtualenv
+
+  echo "-----------------------Setting up Shell Env Vars------------------------------"
+  set_bashrc
+
+  echo "-----------------------Setting up Kafka Cluster on GKE------------------------"
+  CLUSTER_NAME=xlang-kafka-cluster-$RANDOM
+  if [[ "$python_xlang_kafka_taxi_dataflow" = true ]]; then
+    gcloud container clusters create --project=${USER_GCP_PROJECT} --region=${USER_GCP_REGION} --no-enable-ip-alias $CLUSTER_NAME
+    kubectl apply -R -f ${LOCAL_BEAM_DIR}/.test-infra/kubernetes/kafka-cluster
+    echo "* Please wait for 5 mins to let a Kafka cluster be launched on GKE."
+    echo "* Sleeping for 5 mins"
+    sleep 5m
+  else
+    echo "* Skip Kafka cluster setup"
+  fi
+
+  echo "-----------------------Building expansion service jar------------------------"
+  ./gradlew sdks:java:io:expansion-service:shadowJar
+  ./gradlew sdks:java:extensions:sql:expansion-service:shadowJar
+
+  # Run Python XLang pipelines under multiple versions of Python
+  cd ${LOCAL_BEAM_DIR}
+  for py_version in "${PYTHON_VERSIONS_TO_VALIDATE[@]}"
+  do
+    rm -rf ./beam_env_${py_version}
+    echo "--------------Setting up virtualenv with $py_version interpreter----------------"
+    virtualenv beam_env_${py_version} -p $py_version
+    . beam_env_${py_version}/bin/activate
+    ln -s ${LOCAL_BEAM_DIR}/sdks beam_env_${py_version}/lib/sdks
+
+    echo "--------------------------Installing Python SDK-------------------------------"
+    pip install apache-beam-${RELEASE_VER}.zip[gcp]
+
+    echo "----------------Starting XLang Kafka Taxi with DataflowRunner---------------------"
+    if [[ "$python_xlang_kafka_taxi_dataflow" = true ]]; then
+      BOOTSTRAP_SERVERS="$(kubectl get svc outside-0 -o jsonpath='{.status.loadBalancer.ingress[0].ip}'):32400"
+      echo "BOOTSTRAP_SERVERS=${BOOTSTRAP_SERVERS}"
+      KAFKA_TAXI_DF_DATASET=${USER}_python_validations_$(date +%m%d)_$RANDOM
+      bq mk --project_id=${USER_GCP_PROJECT} ${KAFKA_TAXI_DF_DATASET}
+      echo "export BOOTSTRAP_SERVERS=${BOOTSTRAP_SERVERS}" >> ~/.bashrc
+      echo "export KAFKA_TAXI_DF_DATASET=${KAFKA_TAXI_DF_DATASET}" >> ~/.bashrc
+
+      echo "This is a streaming job. This task will be launched in a separate terminal."
+      gnome-terminal -x sh -c \
+      "echo '*****************************************************';
+       echo '* Running Python XLang Kafka Taxi with DataflowRunner';
+       echo '*****************************************************';
+      . ${LOCAL_BEAM_DIR}/beam_env_${py_version}/bin/activate
+      python -m apache_beam.examples.kafkataxi.kafka_taxi \
+      --project=${USER_GCP_PROJECT} \
+      --region=${USER_GCP_REGION} \
+      --topic beam-runnerv2 \
+      --bootstrap_servers ${BOOTSTRAP_SERVERS} \
+      --bq_dataset ${KAFKA_TAXI_DF_DATASET} \
+      --runner DataflowRunner \
+      --num_workers 5 \
+      --temp_location=${USER_GCS_BUCKET}/temp/ \
+      --experiments=use_runner_v2 \
+      --with_metadata \
+      --sdk_location apache-beam-${RELEASE_VER}.zip; \
+      exec bash"
+
+      echo "***************************************************************"
+      echo "* Please wait for at least 10 mins to let Dataflow job be launched and results get populated."
+      echo "* Sleeping for 10 mins"
+      sleep 10m
+      echo "* How to verify results:"
+      echo "* 1. Goto your Dataflow job console and check whether there is any error."
+      echo "* 2. Check whether ${KAFKA_TAXI_DF_DATASET}.xlang_kafka_taxi has data, retrieving BigQuery data as below: "
+      bq head -n 10 ${KAFKA_TAXI_DF_DATASET}.xlang_kafka_taxi
+      echo "***************************************************************"
+    else
+      echo "* Skip Python XLang Kafka Taxi with DataflowRunner"
+    fi
+
+    echo "----------------Starting XLang SQL Taxi with DataflowRunner---------------------"
+    if [[ "$python_xlang_sql_taxi_dataflow" = true ]]; then
+      SQL_TAXI_TOPIC=${USER}_python_validations_$(date +%m%d)_$RANDOM
+      SQL_TAXI_SUBSCRIPTION=${USER}_python_validations_$(date +%m%d)_$RANDOM
+      gcloud pubsub topics create --project=${USER_GCP_PROJECT} ${SQL_TAXI_TOPIC}
+      gcloud pubsub subscriptions create --project=${USER_GCP_PROJECT} --topic=${SQL_TAXI_TOPIC} ${SQL_TAXI_SUBSCRIPTION}
+      echo "export SQL_TAXI_TOPIC=${SQL_TAXI_TOPIC}" >> ~/.bashrc
+
+      echo "This is a streaming job. This task will be launched in a separate terminal."
+      gnome-terminal -x sh -c \
+      "echo '***************************************************';
+       echo '* Running Python XLang SQL Taxi with DataflowRunner';
+       echo '***************************************************';
+      . ${LOCAL_BEAM_DIR}/beam_env_${py_version}/bin/activate
+      python -m apache_beam.examples.sql_taxi \
+      --project=${USER_GCP_PROJECT} \
+      --region=${USER_GCP_REGION} \
+      --runner DataflowRunner \
+      --num_workers 5 \
+      --temp_location=${USER_GCS_BUCKET}/temp/ \
+      --experiments=use_runner_v2 \
+      --output_topic projects/${USER_GCP_PROJECT}/topics/${SQL_TAXI_TOPIC} \
+      --sdk_location apache-beam-${RELEASE_VER}.zip; \
+      exec bash"
+
+      echo "***************************************************************"
+      echo "* Please wait for at least 10 mins to let Dataflow job be launched and results get populated."
+      echo "* Sleeping for 10 mins"
+      sleep 10m
+      echo "* How to verify results:"
+      echo "* 1. Goto your Dataflow job console and check whether there is any error."
+      echo "* 2. Check whether your ${SQL_TAXI_SUBSCRIPTION} subscription has data below:"
+      # run twice since the first execution would return 0 messages

Review comment:
       Any idea  why ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org