You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by yh...@apache.org on 2023/07/22 00:19:14 UTC
[beam] branch master updated: Fix Python Flink runner load tests & Stop publish Python SDK image in beam_portability (#27595)
This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 97cb32ea5dd Fix Python Flink runner load tests & Stop publish Python SDK image in beam_portability (#27595)
97cb32ea5dd is described below
commit 97cb32ea5dd087068ff476a49f845ea8fe0fe3ef
Author: Yi Hu <ya...@google.com>
AuthorDate: Fri Jul 21 20:19:07 2023 -0400
Fix Python Flink runner load tests & Stop publish Python SDK image in beam_portability (#27595)
---
.test-infra/jenkins/LoadTestsBuilder.groovy | 6 ++--
.../jenkins/job_LoadTests_Combine_Flink_Go.groovy | 4 +--
.../job_LoadTests_Combine_Flink_Python.groovy | 3 +-
.../jenkins/job_LoadTests_GBK_Flink_Go.groovy | 4 +--
.../jenkins/job_LoadTests_GBK_Flink_Python.groovy | 3 +-
.../jenkins/job_LoadTests_ParDo_Flink_Go.groovy | 4 +--
.../job_LoadTests_ParDo_Flink_Python.groovy | 3 +-
.../job_LoadTests_SideInput_Flink_Go.groovy | 4 +--
.../jenkins/job_LoadTests_coGBK_Flink_Go.groovy | 4 +--
.../job_LoadTests_coGBK_Flink_Python.groovy | 3 +-
...Commit_Python_Chicago_Taxi_Example_Flink.groovy | 3 +-
.../jenkins/job_Publish_Docker_Snapshots.groovy | 4 +--
.../fnexecution/environment/DockerCommand.java | 41 +++++++++++++++-------
13 files changed, 53 insertions(+), 33 deletions(-)
diff --git a/.test-infra/jenkins/LoadTestsBuilder.groovy b/.test-infra/jenkins/LoadTestsBuilder.groovy
index b5b0b664acf..060a2ea6542 100644
--- a/.test-infra/jenkins/LoadTestsBuilder.groovy
+++ b/.test-infra/jenkins/LoadTestsBuilder.groovy
@@ -24,10 +24,10 @@ import InfluxDBCredentialsHelper
import static PythonTestProperties.LOAD_TEST_PYTHON_VERSION
class LoadTestsBuilder {
- final static String DOCKER_CONTAINER_REGISTRY = 'gcr.io/apache-beam-testing/beam_portability'
- final static String DOCKER_CONTAINER_REGISTRY_SNAPSHOTS = 'gcr.io/apache-beam-testing/beam-sdk'
- final static String GO_SDK_CONTAINER = "${DOCKER_CONTAINER_REGISTRY_SNAPSHOTS}/beam_go_sdk:latest"
+ final static String DOCKER_CONTAINER_REGISTRY = 'gcr.io/apache-beam-testing/beam-sdk'
+ final static String GO_SDK_CONTAINER = "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest"
final static String DOCKER_BEAM_SDK_IMAGE = "beam_python${LOAD_TEST_PYTHON_VERSION}_sdk:latest"
+ final static String DOCKER_BEAM_JOBSERVER = 'gcr.io/apache-beam-testing/beam_portability'
static void loadTests(scope, CommonTestProperties.SDK sdk, List testConfigurations, String test, String mode,
List<String> jobSpecificSwitches = null) {
diff --git a/.test-infra/jenkins/job_LoadTests_Combine_Flink_Go.groovy b/.test-infra/jenkins/job_LoadTests_Combine_Flink_Go.groovy
index e161ba0c0de..9b8adc732f9 100644
--- a/.test-infra/jenkins/job_LoadTests_Combine_Flink_Go.groovy
+++ b/.test-infra/jenkins/job_LoadTests_Combine_Flink_Go.groovy
@@ -23,7 +23,7 @@ import PhraseTriggeringPostCommitBuilder
import Flink
import InfluxDBCredentialsHelper
-import static LoadTestsBuilder.DOCKER_CONTAINER_REGISTRY
+import static LoadTestsBuilder.DOCKER_BEAM_JOBSERVER
import static LoadTestsBuilder.GO_SDK_CONTAINER
@@ -107,7 +107,7 @@ def loadTestJob = { scope, triggeringContext, mode ->
GO_SDK_CONTAINER
],
initialParallelism,
- "${DOCKER_CONTAINER_REGISTRY}/beam_flink${CommonTestProperties.getFlinkVersion()}_job_server:latest")
+ "${DOCKER_BEAM_JOBSERVER}/beam_flink${CommonTestProperties.getFlinkVersion()}_job_server:latest")
// Execute all scenarios connected with initial parallelism.
loadTestsBuilder.loadTests(scope, CommonTestProperties.SDK.GO, initialScenarios, 'combine', mode)
diff --git a/.test-infra/jenkins/job_LoadTests_Combine_Flink_Python.groovy b/.test-infra/jenkins/job_LoadTests_Combine_Flink_Python.groovy
index ff7cc5a9e79..54b92fdade2 100644
--- a/.test-infra/jenkins/job_LoadTests_Combine_Flink_Python.groovy
+++ b/.test-infra/jenkins/job_LoadTests_Combine_Flink_Python.groovy
@@ -24,6 +24,7 @@ import Flink
import InfluxDBCredentialsHelper
import static LoadTestsBuilder.DOCKER_CONTAINER_REGISTRY
+import static LoadTestsBuilder.DOCKER_BEAM_JOBSERVER
import static LoadTestsBuilder.DOCKER_BEAM_SDK_IMAGE
String now = new Date().format("MMddHHmmss", TimeZone.getTimeZone('UTC'))
@@ -135,7 +136,7 @@ def loadTestJob = { scope, triggeringContext, mode ->
"${DOCKER_CONTAINER_REGISTRY}/${DOCKER_BEAM_SDK_IMAGE}"
],
initialParallelism,
- "${DOCKER_CONTAINER_REGISTRY}/beam_flink${CommonTestProperties.getFlinkVersion()}_job_server:latest")
+ "${DOCKER_BEAM_JOBSERVER}/beam_flink${CommonTestProperties.getFlinkVersion()}_job_server:latest")
// Execute all scenarios connected with initial parallelism.
loadTestsBuilder.loadTests(scope, CommonTestProperties.SDK.PYTHON, initialScenarios, 'Combine', mode)
diff --git a/.test-infra/jenkins/job_LoadTests_GBK_Flink_Go.groovy b/.test-infra/jenkins/job_LoadTests_GBK_Flink_Go.groovy
index 59fbeb75bc2..d5a6910b2a0 100644
--- a/.test-infra/jenkins/job_LoadTests_GBK_Flink_Go.groovy
+++ b/.test-infra/jenkins/job_LoadTests_GBK_Flink_Go.groovy
@@ -23,7 +23,7 @@ import PhraseTriggeringPostCommitBuilder
import Flink
import InfluxDBCredentialsHelper
-import static LoadTestsBuilder.DOCKER_CONTAINER_REGISTRY
+import static LoadTestsBuilder.DOCKER_BEAM_JOBSERVER
import static LoadTestsBuilder.GO_SDK_CONTAINER
String now = new Date().format('MMddHHmmss', TimeZone.getTimeZone('UTC'))
@@ -199,7 +199,7 @@ def loadTestJob = { scope, triggeringContext, mode ->
GO_SDK_CONTAINER
],
initialParallelism,
- "${DOCKER_CONTAINER_REGISTRY}/beam_flink${CommonTestProperties.getFlinkVersion()}_job_server:latest")
+ "${DOCKER_BEAM_JOBSERVER}/beam_flink${CommonTestProperties.getFlinkVersion()}_job_server:latest")
// Execute all scenarios connected with initial parallelism.
loadTestsBuilder.loadTests(scope, CommonTestProperties.SDK.GO, initialScenarios, 'group_by_key', mode)
diff --git a/.test-infra/jenkins/job_LoadTests_GBK_Flink_Python.groovy b/.test-infra/jenkins/job_LoadTests_GBK_Flink_Python.groovy
index f4714a2d0e3..25e2647ebf3 100644
--- a/.test-infra/jenkins/job_LoadTests_GBK_Flink_Python.groovy
+++ b/.test-infra/jenkins/job_LoadTests_GBK_Flink_Python.groovy
@@ -24,6 +24,7 @@ import Flink
import InfluxDBCredentialsHelper
import static LoadTestsBuilder.DOCKER_CONTAINER_REGISTRY
+import static LoadTestsBuilder.DOCKER_BEAM_JOBSERVER
import static LoadTestsBuilder.DOCKER_BEAM_SDK_IMAGE
String now = new Date().format("MMddHHmmss", TimeZone.getTimeZone('UTC'))
@@ -146,7 +147,7 @@ def loadTest = { scope, triggeringContext ->
"${DOCKER_CONTAINER_REGISTRY}/${DOCKER_BEAM_SDK_IMAGE}"
],
numberOfWorkers,
- "${DOCKER_CONTAINER_REGISTRY}/beam_flink${CommonTestProperties.getFlinkVersion()}_job_server:latest")
+ "${DOCKER_BEAM_JOBSERVER}/beam_flink${CommonTestProperties.getFlinkVersion()}_job_server:latest")
def configurations = testScenarios.findAll { it.pipelineOptions?.parallelism?.value == numberOfWorkers }
loadTestsBuilder.loadTests(scope, sdk, configurations, "GBK", "batch")
diff --git a/.test-infra/jenkins/job_LoadTests_ParDo_Flink_Go.groovy b/.test-infra/jenkins/job_LoadTests_ParDo_Flink_Go.groovy
index c5af6cc42d1..df20312f27b 100644
--- a/.test-infra/jenkins/job_LoadTests_ParDo_Flink_Go.groovy
+++ b/.test-infra/jenkins/job_LoadTests_ParDo_Flink_Go.groovy
@@ -23,7 +23,7 @@ import PhraseTriggeringPostCommitBuilder
import Flink
import InfluxDBCredentialsHelper
-import static LoadTestsBuilder.DOCKER_CONTAINER_REGISTRY
+import static LoadTestsBuilder.DOCKER_BEAM_JOBSERVER
import static LoadTestsBuilder.GO_SDK_CONTAINER
String now = new Date().format("MMddHHmmss", TimeZone.getTimeZone('UTC'))
@@ -127,7 +127,7 @@ def loadTestJob = { scope, triggeringContext, mode ->
GO_SDK_CONTAINER
],
numberOfWorkers,
- "${DOCKER_CONTAINER_REGISTRY}/beam_flink${CommonTestProperties.getFlinkVersion()}_job_server:latest")
+ "${DOCKER_BEAM_JOBSERVER}/beam_flink${CommonTestProperties.getFlinkVersion()}_job_server:latest")
loadTestsBuilder.loadTests(scope, CommonTestProperties.SDK.GO, batchScenarios(), 'ParDo', mode)
}
diff --git a/.test-infra/jenkins/job_LoadTests_ParDo_Flink_Python.groovy b/.test-infra/jenkins/job_LoadTests_ParDo_Flink_Python.groovy
index 2979a16300c..4af2efd1be6 100644
--- a/.test-infra/jenkins/job_LoadTests_ParDo_Flink_Python.groovy
+++ b/.test-infra/jenkins/job_LoadTests_ParDo_Flink_Python.groovy
@@ -24,6 +24,7 @@ import Flink
import InfluxDBCredentialsHelper
import static LoadTestsBuilder.DOCKER_CONTAINER_REGISTRY
+import static LoadTestsBuilder.DOCKER_BEAM_JOBSERVER
import static LoadTestsBuilder.DOCKER_BEAM_SDK_IMAGE
String now = new Date().format("MMddHHmmss", TimeZone.getTimeZone('UTC'))
@@ -329,7 +330,7 @@ def loadTestJob = { scope, triggeringContext, mode ->
"${DOCKER_CONTAINER_REGISTRY}/${DOCKER_BEAM_SDK_IMAGE}"
],
numberOfWorkers,
- "${DOCKER_CONTAINER_REGISTRY}/beam_flink${CommonTestProperties.getFlinkVersion()}_job_server:latest")
+ "${DOCKER_BEAM_JOBSERVER}/beam_flink${CommonTestProperties.getFlinkVersion()}_job_server:latest")
loadTestsBuilder.loadTests(scope, CommonTestProperties.SDK.PYTHON, testScenarios, 'ParDo', mode)
}
diff --git a/.test-infra/jenkins/job_LoadTests_SideInput_Flink_Go.groovy b/.test-infra/jenkins/job_LoadTests_SideInput_Flink_Go.groovy
index f2c008b4f25..bd0eaa4f23e 100644
--- a/.test-infra/jenkins/job_LoadTests_SideInput_Flink_Go.groovy
+++ b/.test-infra/jenkins/job_LoadTests_SideInput_Flink_Go.groovy
@@ -22,7 +22,7 @@ import LoadTestsBuilder as loadTestsBuilder
import PhraseTriggeringPostCommitBuilder
import InfluxDBCredentialsHelper
-import static LoadTestsBuilder.DOCKER_CONTAINER_REGISTRY
+import static LoadTestsBuilder.DOCKER_BEAM_JOBSERVER
import static LoadTestsBuilder.GO_SDK_CONTAINER
def now = new Date().format("MMddHHmmss", TimeZone.getTimeZone('UTC'))
@@ -79,7 +79,7 @@ def loadTestJob = { scope, triggeringContext, mode ->
GO_SDK_CONTAINER
],
numberOfWorkers,
- "${DOCKER_CONTAINER_REGISTRY}/beam_flink${CommonTestProperties.getFlinkVersion()}_job_server:latest")
+ "${DOCKER_BEAM_JOBSERVER}/beam_flink${CommonTestProperties.getFlinkVersion()}_job_server:latest")
loadTestsBuilder.loadTests(scope, CommonTestProperties.SDK.GO,
batchScenarios(), 'SideInput', mode)
diff --git a/.test-infra/jenkins/job_LoadTests_coGBK_Flink_Go.groovy b/.test-infra/jenkins/job_LoadTests_coGBK_Flink_Go.groovy
index 95468a9d2f2..8c7a60e724f 100644
--- a/.test-infra/jenkins/job_LoadTests_coGBK_Flink_Go.groovy
+++ b/.test-infra/jenkins/job_LoadTests_coGBK_Flink_Go.groovy
@@ -23,7 +23,7 @@ import PhraseTriggeringPostCommitBuilder
import Flink
import InfluxDBCredentialsHelper
-import static LoadTestsBuilder.DOCKER_CONTAINER_REGISTRY
+import static LoadTestsBuilder.DOCKER_BEAM_JOBSERVER
import static LoadTestsBuilder.GO_SDK_CONTAINER
String now = new Date().format("MMddHHmmss", TimeZone.getTimeZone('UTC'))
@@ -159,7 +159,7 @@ def loadTestJob = { scope, triggeringContext, mode ->
GO_SDK_CONTAINER
],
numberOfWorkers,
- "${DOCKER_CONTAINER_REGISTRY}/beam_flink${CommonTestProperties.getFlinkVersion()}_job_server:latest")
+ "${DOCKER_BEAM_JOBSERVER}/beam_flink${CommonTestProperties.getFlinkVersion()}_job_server:latest")
loadTestsBuilder.loadTests(scope, CommonTestProperties.SDK.GO, batchScenarios(), 'CoGBK', mode)
}
diff --git a/.test-infra/jenkins/job_LoadTests_coGBK_Flink_Python.groovy b/.test-infra/jenkins/job_LoadTests_coGBK_Flink_Python.groovy
index a0a06ea4f14..9a0798f8107 100644
--- a/.test-infra/jenkins/job_LoadTests_coGBK_Flink_Python.groovy
+++ b/.test-infra/jenkins/job_LoadTests_coGBK_Flink_Python.groovy
@@ -25,6 +25,7 @@ import InfluxDBCredentialsHelper
import static LoadTestsBuilder.DOCKER_CONTAINER_REGISTRY
import static LoadTestsBuilder.DOCKER_BEAM_SDK_IMAGE
+import static LoadTestsBuilder.DOCKER_BEAM_JOBSERVER
String now = new Date().format("MMddHHmmss", TimeZone.getTimeZone('UTC'))
@@ -137,7 +138,7 @@ def loadTest = { scope, triggeringContext ->
"${DOCKER_CONTAINER_REGISTRY}/${DOCKER_BEAM_SDK_IMAGE}"
],
numberOfWorkers,
- "${DOCKER_CONTAINER_REGISTRY}/beam_flink${CommonTestProperties.getFlinkVersion()}_job_server:latest")
+ "${DOCKER_BEAM_JOBSERVER}/beam_flink${CommonTestProperties.getFlinkVersion()}_job_server:latest")
loadTestsBuilder.loadTests(scope, CommonTestProperties.SDK.PYTHON, testScenarios, 'CoGBK', 'batch')
}
diff --git a/.test-infra/jenkins/job_PostCommit_Python_Chicago_Taxi_Example_Flink.groovy b/.test-infra/jenkins/job_PostCommit_Python_Chicago_Taxi_Example_Flink.groovy
index c1b4a4496f2..6cf852a1689 100644
--- a/.test-infra/jenkins/job_PostCommit_Python_Chicago_Taxi_Example_Flink.groovy
+++ b/.test-infra/jenkins/job_PostCommit_Python_Chicago_Taxi_Example_Flink.groovy
@@ -22,6 +22,7 @@ import Flink
import LoadTestsBuilder
import PhraseTriggeringPostCommitBuilder
+import static LoadTestsBuilder.DOCKER_BEAM_JOBSERVER
import static LoadTestsBuilder.DOCKER_CONTAINER_REGISTRY
import static PythonTestProperties.CHICAGO_TAXI_EXAMPLE_FLINK_PYTHON_VERSION
@@ -38,7 +39,7 @@ def chicagoTaxiJob = { scope ->
"${DOCKER_CONTAINER_REGISTRY}/${beamSdkDockerImage}"
],
numberOfWorkers,
- "${DOCKER_CONTAINER_REGISTRY}/beam_flink${CommonTestProperties.getFlinkVersion()}_job_server:latest")
+ "${DOCKER_BEAM_JOBSERVER}/beam_flink${CommonTestProperties.getFlinkVersion()}_job_server:latest")
def pipelineOptions = [
parallelism : numberOfWorkers,
diff --git a/.test-infra/jenkins/job_Publish_Docker_Snapshots.groovy b/.test-infra/jenkins/job_Publish_Docker_Snapshots.groovy
index e313b939255..510acd8f37d 100644
--- a/.test-infra/jenkins/job_Publish_Docker_Snapshots.groovy
+++ b/.test-infra/jenkins/job_Publish_Docker_Snapshots.groovy
@@ -41,9 +41,7 @@ job('beam_Publish_Docker_Snapshots') {
gradle {
rootBuildScriptDir(commonJobProperties.checkoutDir)
commonJobProperties.setGradleSwitches(delegate)
- SUPPORTED_CONTAINER_TASKS.each { taskVer ->
- tasks(":sdks:python:container:${taskVer}:dockerPush")
- }
+ tasks(":runners:spark:${CommonTestProperties.getSparkVersion()}:job-server:container:dockerPush")
tasks(":runners:flink:${CommonTestProperties.getFlinkVersion()}:job-server-container:dockerPush")
switches("-Pdocker-repository-root=gcr.io/apache-beam-testing/beam_portability")
switches("-Pdocker-tag=latest")
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerCommand.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerCommand.java
index bdd995b4b7c..e186a2924a1 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerCommand.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerCommand.java
@@ -53,20 +53,28 @@ class DockerCommand {
// but we _should_ always capture full ids.
private static final Pattern CONTAINER_ID_PATTERN = Pattern.compile("\\p{XDigit}{64}");
+ /**
+ * Return a DockerCommand instance with default timeout settings: pull timeout 10 min and other
+ * command timeout 2 min.
+ */
public static DockerCommand getDefault() {
- return forExecutable(DEFAULT_DOCKER_COMMAND, Duration.ofMinutes(2));
+ return forExecutable(DEFAULT_DOCKER_COMMAND, Duration.ofMinutes(2), Duration.ofMinutes(10));
}
- static DockerCommand forExecutable(String dockerExecutable, Duration commandTimeout) {
- return new DockerCommand(dockerExecutable, commandTimeout);
+ static DockerCommand forExecutable(
+ String dockerExecutable, Duration commandTimeout, Duration pullTimeout) {
+ return new DockerCommand(dockerExecutable, commandTimeout, pullTimeout);
}
private final String dockerExecutable;
private final Duration commandTimeout;
+ // pull remote image can take longer time
+ private final Duration pullTimeout;
- private DockerCommand(String dockerExecutable, Duration commandTimeout) {
+ private DockerCommand(String dockerExecutable, Duration commandTimeout, Duration pullTimeout) {
this.dockerExecutable = dockerExecutable;
this.commandTimeout = commandTimeout;
+ this.pullTimeout = pullTimeout;
}
/**
@@ -83,7 +91,8 @@ class DockerCommand {
// Pull the image from docker repo. This will be no-op if the image already exists.
try {
runShortCommand(
- ImmutableList.<String>builder().add(dockerExecutable).add("pull").add(imageTag).build());
+ ImmutableList.<String>builder().add(dockerExecutable).add("pull").add(imageTag).build(),
+ pullTimeout);
} catch (IOException | TimeoutException | InterruptedException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Unable to pull docker image {}", imageTag, e);
@@ -134,7 +143,8 @@ class DockerCommand {
checkArgument(
CONTAINER_ID_PATTERN.matcher(containerId).matches(),
"Container ID must be a 64-character hexadecimal string");
- return runShortCommand(Arrays.asList(dockerExecutable, "logs", containerId), true, "\n");
+ return runShortCommand(
+ Arrays.asList(dockerExecutable, "logs", containerId), true, "\n", commandTimeout);
}
/**
@@ -168,7 +178,12 @@ class DockerCommand {
private String runShortCommand(List<String> invocation)
throws IOException, TimeoutException, InterruptedException {
- return runShortCommand(invocation, false, "");
+ return runShortCommand(invocation, false, "", commandTimeout);
+ }
+
+ private String runShortCommand(List<String> invocation, Duration timeout)
+ throws IOException, TimeoutException, InterruptedException {
+ return runShortCommand(invocation, false, "", timeout);
}
/**
@@ -182,7 +197,10 @@ class DockerCommand {
* @throws TimeoutException if command has not finished by {@link DockerCommand#commandTimeout}
*/
private String runShortCommand(
- List<String> invocation, boolean redirectErrorStream, CharSequence delimiter)
+ List<String> invocation,
+ boolean redirectErrorStream,
+ CharSequence delimiter,
+ Duration timeout)
throws IOException, TimeoutException, InterruptedException {
ProcessBuilder pb = new ProcessBuilder(invocation);
pb.redirectErrorStream(redirectErrorStream);
@@ -216,7 +234,7 @@ class DockerCommand {
});
}
// TODO: Retry on interrupt?
- boolean processDone = process.waitFor(commandTimeout.toMillis(), TimeUnit.MILLISECONDS);
+ boolean processDone = process.waitFor(timeout.toMillis(), TimeUnit.MILLISECONDS);
if (!processDone) {
process.destroy();
throw new TimeoutException(
@@ -228,7 +246,7 @@ class DockerCommand {
if (exitCode != 0) {
String errorString;
try {
- errorString = errorFuture.get(commandTimeout.toMillis(), TimeUnit.MILLISECONDS);
+ errorString = errorFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
} catch (Exception stderrEx) {
errorString =
String.format("Error capturing %s: %s", errorStringName, stderrEx.getMessage());
@@ -242,8 +260,7 @@ class DockerCommand {
errorString));
}
try {
- // TODO: Consider a stricter timeout.
- return resultString.get(commandTimeout.toMillis(), TimeUnit.MILLISECONDS);
+ return resultString.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
Throwable cause = e.getCause();
// Recast any exceptions in reading output as IOExceptions.