You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/29 21:20:04 UTC

[GitHub] [beam] pabloem commented on a diff in pull request #17727: [BEAM-9482] Fix "provided port is already allocated" for PerformanceTestsKafka

pabloem commented on code in PR #17727:
URL: https://github.com/apache/beam/pull/17727#discussion_r910413494


##########
.test-infra/jenkins/Kubernetes.groovy:
##########
@@ -132,4 +132,28 @@ class Kubernetes {
       }
     }
   }
+
+  /**
+   * Specifies steps that will return an available port on Kubernetes cluster
+   *
+   * @param lowRangePort - low range port to be used
+   * @param highRangePort - high range port to be used
+   * @param referenceName - name of the environment variable
+   */
+  void availablePort(String lowRangePort, String highRangePort, String referenceName) {

Review Comment:
   is there a way to log what's happening so we can debug in case of failures?



##########
.test-infra/jenkins/job_PerformanceTests_KafkaIO_IT.groovy:
##########
@@ -21,23 +21,37 @@ import Kubernetes
 import InfluxDBCredentialsHelper
 
 String jobName = "beam_PerformanceTests_Kafka_IO"
+String HIGH_RANGE_PORT = "32767"
 
 job(jobName) {
-  common.setTopLevelMainJobProperties(delegate)
-  // TODO(https://github.com/apache/beam/issues/20333): Re-enable once fixed.
-  // common.setAutoJob(delegate, 'H H/6 * * *')
-  //  common.enablePhraseTriggeringFromPullRequest(
-  //      delegate,
-  //      'Java KafkaIO Performance Test',
-  //      'Run Java KafkaIO Performance Test')
+  common.setTopLevelMainJobProperties(delegate, 'master', 120)
+  common.setAutoJob(delegate, 'H H/6 * * *')
+  common.enablePhraseTriggeringFromPullRequest(
+      delegate,
+      'Java KafkaIO Performance Test',
+      'Run Java KafkaIO Performance Test')
   InfluxDBCredentialsHelper.useCredentials(delegate)
 
   String namespace = common.getKubernetesNamespace(jobName)
   String kubeconfig = common.getKubeconfigLocationForNamespace(namespace)
   Kubernetes k8s = Kubernetes.create(delegate, kubeconfig, namespace)
-  k8s.apply(common.makePathAbsolute("src/.test-infra/kubernetes/kafka-cluster"))
 
+  String kafkaDir = common.makePathAbsolute("src/.test-infra/kubernetes/kafka-cluster")
+  String kafkaTopicJob="job.batch/kafka-config-eff079ec"
+
+  // Select available ports for services and avoid collisions
+  steps {
+    String[] configuredPorts = ["32400", "32401", "32402"]
+    (0..2).each { service ->
+      k8s.availablePort(service == 0 ? configuredPorts[service] : "\$KAFKA_SERVICE_PORT_${service-1}",
+          HIGH_RANGE_PORT, "KAFKA_SERVICE_PORT_$service")
+      shell("sed -i -e s/${configuredPorts[service]}/\$KAFKA_SERVICE_PORT_$service/ \
+                  ${kafkaDir}/04-outside-services/outside-${service}.yml")
+    }
+  }

Review Comment:
   Can you add a comment so I can understand what this does? To be honest, I don't know where `KAFKA_SERVICE_PORT_` and other special variables are coming from.
   Please also document this in the confluence wiki.



##########
.test-infra/jenkins/job_PerformanceTests_KafkaIO_IT.groovy:
##########
@@ -21,23 +21,37 @@ import Kubernetes
 import InfluxDBCredentialsHelper
 
 String jobName = "beam_PerformanceTests_Kafka_IO"
+String HIGH_RANGE_PORT = "32767"
 
 job(jobName) {
-  common.setTopLevelMainJobProperties(delegate)
-  // TODO(https://github.com/apache/beam/issues/20333): Re-enable once fixed.
-  // common.setAutoJob(delegate, 'H H/6 * * *')
-  //  common.enablePhraseTriggeringFromPullRequest(
-  //      delegate,
-  //      'Java KafkaIO Performance Test',
-  //      'Run Java KafkaIO Performance Test')
+  common.setTopLevelMainJobProperties(delegate, 'master', 120)
+  common.setAutoJob(delegate, 'H H/6 * * *')
+  common.enablePhraseTriggeringFromPullRequest(
+      delegate,
+      'Java KafkaIO Performance Test',
+      'Run Java KafkaIO Performance Test')
   InfluxDBCredentialsHelper.useCredentials(delegate)
 
   String namespace = common.getKubernetesNamespace(jobName)
   String kubeconfig = common.getKubeconfigLocationForNamespace(namespace)
   Kubernetes k8s = Kubernetes.create(delegate, kubeconfig, namespace)
-  k8s.apply(common.makePathAbsolute("src/.test-infra/kubernetes/kafka-cluster"))
 
+  String kafkaDir = common.makePathAbsolute("src/.test-infra/kubernetes/kafka-cluster")
+  String kafkaTopicJob="job.batch/kafka-config-eff079ec"

Review Comment:
   ```suggestion
     String kafkaTopicJob = "job.batch/kafka-config-eff079ec"
   ```
   nitpicking : )



##########
.test-infra/jenkins/job_PerformanceTests_KafkaIO_IT.groovy:
##########
@@ -21,23 +21,37 @@ import Kubernetes
 import InfluxDBCredentialsHelper
 
 String jobName = "beam_PerformanceTests_Kafka_IO"
+String HIGH_RANGE_PORT = "32767"
 
 job(jobName) {
-  common.setTopLevelMainJobProperties(delegate)
-  // TODO(https://github.com/apache/beam/issues/20333): Re-enable once fixed.
-  // common.setAutoJob(delegate, 'H H/6 * * *')
-  //  common.enablePhraseTriggeringFromPullRequest(
-  //      delegate,
-  //      'Java KafkaIO Performance Test',
-  //      'Run Java KafkaIO Performance Test')
+  common.setTopLevelMainJobProperties(delegate, 'master', 120)
+  common.setAutoJob(delegate, 'H H/6 * * *')
+  common.enablePhraseTriggeringFromPullRequest(
+      delegate,
+      'Java KafkaIO Performance Test',
+      'Run Java KafkaIO Performance Test')
   InfluxDBCredentialsHelper.useCredentials(delegate)
 
   String namespace = common.getKubernetesNamespace(jobName)
   String kubeconfig = common.getKubeconfigLocationForNamespace(namespace)
   Kubernetes k8s = Kubernetes.create(delegate, kubeconfig, namespace)
-  k8s.apply(common.makePathAbsolute("src/.test-infra/kubernetes/kafka-cluster"))
 
+  String kafkaDir = common.makePathAbsolute("src/.test-infra/kubernetes/kafka-cluster")
+  String kafkaTopicJob="job.batch/kafka-config-eff079ec"
+
+  // Select available ports for services and avoid collisions
+  steps {
+    String[] configuredPorts = ["32400", "32401", "32402"]
+    (0..2).each { service ->
+      k8s.availablePort(service == 0 ? configuredPorts[service] : "\$KAFKA_SERVICE_PORT_${service-1}",
+          HIGH_RANGE_PORT, "KAFKA_SERVICE_PORT_$service")
+      shell("sed -i -e s/${configuredPorts[service]}/\$KAFKA_SERVICE_PORT_$service/ \
+                  ${kafkaDir}/04-outside-services/outside-${service}.yml")
+    }
+  }

Review Comment:
   Please add a comment at the top describing what resources are used by this test and what the test does.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org