You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/05/31 15:58:01 UTC

[GitHub] [flink] afedulov opened a new pull request, #19856: [FLINK-27854][tests] Refactor FlinkImageBuilder and FlinkContainerBuilder

afedulov opened a new pull request, #19856:
URL: https://github.com/apache/flink/pull/19856

   ## What is the purpose of the change
   
   FlinkImageBuilder and FlinkContainerBuilder are tightly coupled with using flink-dist to create Flink images on the flight. Some configuration is currently "hardcoded" in those classes and cannot be controlled by the user creating the FlinkContainerTestEnvironment.  For externalized connectors, we need the setup to be more flexible and to also work with the existing images published to GHCR, without relying on flink-dist. 
   
   ## Brief change log
   
   This PR introduces a single configuration holder FlinkContainersConfig that:
   - enables more flexible control over the container properties
   - refactors out hardcoded configuration
   - allows test environments based on existing base images rather than on flink-dist
   
   ## Verifying this change
   
   Verified manually by running tests based on the new approach
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes / **no**)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**)
     - The serializers: (yes / **no** / don't know)
     - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't know)
     - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes / **no**)
     - If yes, how is the feature documented? (not applicable / docs / **JavaDocs** / not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] afedulov commented on a diff in pull request #19856: [FLINK-27854][tests] Refactor FlinkImageBuilder and FlinkContainerBuilder

Posted by GitBox <gi...@apache.org>.
afedulov commented on code in PR #19856:
URL: https://github.com/apache/flink/pull/19856#discussion_r896150381


##########
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkImageBuilder.java:
##########
@@ -149,15 +160,15 @@ public FlinkImageBuilder setTimeout(Duration timeout) {
     /** Use this image for building a JobManager. */
     public FlinkImageBuilder asJobManager() {
         checkStartupCommandNotSet();
-        this.startupCommand = "flink/bin/jobmanager.sh start-foreground && tail -f /dev/null";
+        this.startupCommand = "bin/jobmanager.sh start-foreground && tail -f /dev/null";

Review Comment:
   This is why I've added this:
   https://github.com/apache/flink/blob/88ac2f5ba12b0597fe63e92e8a0ae65a61576e99/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainersBuilder.java#L207 
   It makes everything nicely deterministic in relation to the Flink home directory.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] afedulov commented on pull request #19856: [FLINK-27854][tests] Refactor FlinkImageBuilder and FlinkContainerBuilder

Posted by GitBox <gi...@apache.org>.
afedulov commented on PR #19856:
URL: https://github.com/apache/flink/pull/19856#issuecomment-1145055445

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] afedulov commented on a diff in pull request #19856: [FLINK-27854][tests] Refactor FlinkImageBuilder and FlinkContainerBuilder

Posted by GitBox <gi...@apache.org>.
afedulov commented on code in PR #19856:
URL: https://github.com/apache/flink/pull/19856#discussion_r906708437


##########
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainersConfig.java:
##########
@@ -0,0 +1,599 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.flink.container;
+
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Properties;
+import java.util.UUID;
+
+import static org.apache.flink.configuration.HeartbeatManagerOptions.HEARTBEAT_INTERVAL;
+import static org.apache.flink.configuration.HeartbeatManagerOptions.HEARTBEAT_TIMEOUT;
+import static org.apache.flink.configuration.JobManagerOptions.SLOT_REQUEST_TIMEOUT;
+import static org.apache.flink.configuration.MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL;
+
+/** The central configuration holder for Flink container-based test environments. */
+public class FlinkContainersConfig {

Review Comment:
   Hi @PatrickRen, I dropped `FlinkContainerBuilder` and introduced `TestcontainersSettings`, as you proposed.
   I would prefer to keep the builder patterns for the following reasons:
   1) It is one of the most widespread design patterns in general and we also use them everywhere in Flink. I do not think they are confusing for the users.
   2) I believe if anything, they reduce the potential confusion for the users. A user might change the settings later and expect the changes to take effect on the execution, but it won't because we already populated concrete underlying classes.  Immutability makes things more clear. 
   3) Inline declaration: FlinkContainers are mostly initialized as class fields and using the setter style would require every test implementation to either use static{} initialization blocks or to provide initialization methods similar to what is done with the [Flink config ](https://github.com/apache/flink/blob/89f782416bf5b600ce35e3e3e0edd4992019f2e6/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SmokeKafkaITCase.java#L99). I find it pretty convenient to be able to configure those test fields inline.
   
   Looking forward to your feedback!
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] PatrickRen commented on a diff in pull request #19856: [FLINK-27854][tests] Refactor FlinkImageBuilder and FlinkContainerBuilder

Posted by GitBox <gi...@apache.org>.
PatrickRen commented on code in PR #19856:
URL: https://github.com/apache/flink/pull/19856#discussion_r899226249


##########
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainersConfig.java:
##########
@@ -0,0 +1,599 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.flink.container;
+
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Properties;
+import java.util.UUID;
+
+import static org.apache.flink.configuration.HeartbeatManagerOptions.HEARTBEAT_INTERVAL;
+import static org.apache.flink.configuration.HeartbeatManagerOptions.HEARTBEAT_TIMEOUT;
+import static org.apache.flink.configuration.JobManagerOptions.SLOT_REQUEST_TIMEOUT;
+import static org.apache.flink.configuration.MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL;
+
+/** The central configuration holder for Flink container-based test environments. */
+public class FlinkContainersConfig {
+    private final String baseImage;
+    private final int numTaskManagers;
+    private final int numSlotsPerTaskManager;
+    private final Collection<String> jarPaths;
+    private final Configuration flinkConfiguration;
+    private final String taskManagerHostnamePrefix;
+    private final Boolean buildFromFlinkDist;
+    private final String flinkDistLocation;
+    private final String flinkHome;
+    private final String checkpointPath;
+    private final String haStoragePath;
+    private final Boolean zookeeperHA;
+    private final String zookeeperHostname;
+    private final Properties logProperties;
+
+    // Defaults
+    private static final long DEFAULT_METRIC_FETCHER_UPDATE_INTERVAL_MS = 1000L;
+    private static final long DEFAULT_SLOT_REQUEST_TIMEOUT_MS = 10_000L;
+    private static final long DEFAULT_HEARTBEAT_TIMEOUT_MS = 5_000L;
+    private static final long DEFAULT_HEARTBEAT_INTERVAL_MS = 1000L;
+    private static final MemorySize DEFAULT_TM_TOTAL_PROCESS_MEMORY = MemorySize.ofMebiBytes(1728);
+    private static final MemorySize DEFAULT_JM_TOTAL_PROCESS_MEMORY = MemorySize.ofMebiBytes(1600);
+
+    private static final int DEFAULT_NUM_TASK_MANAGERS = 1;
+    private static final int DEFAULT_NUM_SLOTS_PER_TASK_MANAGER = 1;
+    private static final String DEFAULT_TASK_MANAGERS_HOSTNAME_PREFIX = "taskmanager-";
+    private static final String DEFAULT_JOB_MANAGER_HOSTNAME = "jobmanager";
+    private static final String DEFAULT_BIND_ADDRESS = "0.0.0.0";
+    private static final String DEFAULT_FLINK_HOME = "/opt/flink";
+    private static final String DEFAULT_CHECKPOINT_PATH = DEFAULT_FLINK_HOME + "/checkpoint";
+    private static final String DEFAULT_HA_STORAGE_PATH = DEFAULT_FLINK_HOME + "/recovery";
+
+    private static final String DEFAULT_ZOOKEEPER_HOSTNAME = "zookeeper";
+    private static final String DEFAULT_ZOOKEEPER_QUORUM = DEFAULT_ZOOKEEPER_HOSTNAME + ":2181";
+    // --
+
+    private FlinkContainersConfig(Builder builder) {
+        baseImage = builder.baseImage;
+        numTaskManagers = builder.numTaskManagers;
+        numSlotsPerTaskManager = builder.numSlotsPerTaskManager;
+        jarPaths = builder.jarPaths;
+        flinkConfiguration = builder.flinkConfiguration;
+        taskManagerHostnamePrefix = builder.taskManagerHostnamePrefix;
+        buildFromFlinkDist = builder.buildFromFlinkDist;
+        flinkDistLocation = builder.flinkDistLocation;
+        flinkHome = builder.flinkHome;
+        checkpointPath = builder.checkpointPath;
+        haStoragePath = builder.haStoragePath;
+        zookeeperHA = builder.zookeeperHA;
+        zookeeperHostname = builder.zookeeperHostname;
+        logProperties = builder.logProperties;
+    }
+
+    /**
+     * Builder for {@code FlinkContainersConfig}.
+     *
+     * @return The builder.
+     */
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    /**
+     * {@code FlinkContainersConfig} based on defaults.
+     *
+     * @return The Flink containers config.
+     */
+    public static FlinkContainersConfig defaultConfig() {
+        return builder().build();
+    }
+
+    /**
+     * {@code FlinkContainersConfig} based on provided Flink configuration.
+     *
+     * @param config The config.
+     * @return The flink containers config.
+     */
+    public static FlinkContainersConfig basedOn(Configuration config) {
+        return builder().basedOn(config).build();
+    }
+
+    /** {@code FlinkContainersConfig} builder static inner class. */
+    public static final class Builder {
+        private String baseImage;
+        private int numTaskManagers = DEFAULT_NUM_TASK_MANAGERS;
+        private int numSlotsPerTaskManager = DEFAULT_NUM_SLOTS_PER_TASK_MANAGER;
+        private Collection<String> jarPaths = new ArrayList<>();
+        private Configuration flinkConfiguration = defaultFlinkConfig();
+        private String taskManagerHostnamePrefix = DEFAULT_TASK_MANAGERS_HOSTNAME_PREFIX;
+        private Boolean buildFromFlinkDist = true;
+        private String flinkDistLocation;
+        private String flinkHome = DEFAULT_FLINK_HOME;
+        private String checkpointPath = DEFAULT_CHECKPOINT_PATH;
+        private String haStoragePath = DEFAULT_HA_STORAGE_PATH;
+        private Boolean zookeeperHA = false;
+        private String zookeeperHostname = DEFAULT_ZOOKEEPER_HOSTNAME;
+        private Properties logProperties = defaultLoggingProperties();
+
+        private Builder() {}
+
+        /**
+         * Sets the {@code baseImage} and returns a reference to this Builder enabling method
+         * chaining.
+         *
+         * @param baseImage The {@code baseImage} to set.
+         * @return A reference to this Builder.
+         */
+        public Builder baseImage(String baseImage) {
+            this.baseImage = baseImage;
+            this.buildFromFlinkDist = false;
+            return this;
+        }
+
+        /**
+         * Sets the {@code flinkDistLocation} and returns a reference to this Builder enabling
+         * method chaining.
+         *
+         * @param flinkDistLocation The {@code flinkDistLocation} to set.
+         * @return A reference to this Builder.
+         */
+        public Builder flinkDistLocation(String flinkDistLocation) {

Review Comment:
   Make sense. We can make the JavaDoc more descriptive here then.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] afedulov commented on a diff in pull request #19856: [FLINK-27854][tests] Refactor FlinkImageBuilder and FlinkContainerBuilder

Posted by GitBox <gi...@apache.org>.
afedulov commented on code in PR #19856:
URL: https://github.com/apache/flink/pull/19856#discussion_r896150381


##########
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkImageBuilder.java:
##########
@@ -149,15 +160,15 @@ public FlinkImageBuilder setTimeout(Duration timeout) {
     /** Use this image for building a JobManager. */
     public FlinkImageBuilder asJobManager() {
         checkStartupCommandNotSet();
-        this.startupCommand = "flink/bin/jobmanager.sh start-foreground && tail -f /dev/null";
+        this.startupCommand = "bin/jobmanager.sh start-foreground && tail -f /dev/null";

Review Comment:
   This is why I've added this:
   https://github.com/apache/flink/blob/88ac2f5ba12b0597fe63e92e8a0ae65a61576e99/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainersBuilder.java#L207 
   It makes all further actions nicely deterministic in relation to the Flink home directory.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] afedulov commented on a diff in pull request #19856: [FLINK-27854][tests] Refactor FlinkImageBuilder and FlinkContainerBuilder

Posted by GitBox <gi...@apache.org>.
afedulov commented on code in PR #19856:
URL: https://github.com/apache/flink/pull/19856#discussion_r894608652


##########
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainersConfig.java:
##########
@@ -0,0 +1,599 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.flink.container;
+
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Properties;
+import java.util.UUID;
+
+import static org.apache.flink.configuration.HeartbeatManagerOptions.HEARTBEAT_INTERVAL;
+import static org.apache.flink.configuration.HeartbeatManagerOptions.HEARTBEAT_TIMEOUT;
+import static org.apache.flink.configuration.JobManagerOptions.SLOT_REQUEST_TIMEOUT;
+import static org.apache.flink.configuration.MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL;
+
+/** The central configuration holder for Flink container-based test environments. */
+public class FlinkContainersConfig {

Review Comment:
   @PatrickRen thanks for the review.
   I was also a bit undecided about what to do with the `FlinkContainersBuilder`. One factor is that I wanted to keep `FlinkContainerConfig` independent of any Testcontainer-specific classes (`Network`, `GenericContainer`) - the idea is to have it purely as a generalized configuration for Flink containers and not tie it into any particular testing framework implementation.  I removed `setBaseImage` as another generic parameter, but am not convinced that  moving the rest of them to `FlinkContainerConfig` is the right move. 
   
   Another question, if we decide to get rid of `FlinkContainersBuilder`, is where to place all of it's methods. `FlinkContainers` is mainly concerned with the lifecycle management of a the containers and interaction with them (job submission, cleanup etc.) and it is already a sizeable class. From the perspective of separation of concerns it makes sense to me to have a delegate that is responsible purely for the preparational steps that clearly preceded the responsibilities of `FlinkContainers`. Maybe `*Builder` is a confusing name, because it mixes both the builder pattern and the fact that we mean to actually _build_ the containers with Docker.
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] PatrickRen commented on a diff in pull request #19856: [FLINK-27854][tests] Refactor FlinkImageBuilder and FlinkContainerBuilder

Posted by GitBox <gi...@apache.org>.
PatrickRen commented on code in PR #19856:
URL: https://github.com/apache/flink/pull/19856#discussion_r899218732


##########
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainersConfig.java:
##########
@@ -0,0 +1,599 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.flink.container;
+
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Properties;
+import java.util.UUID;
+
+import static org.apache.flink.configuration.HeartbeatManagerOptions.HEARTBEAT_INTERVAL;
+import static org.apache.flink.configuration.HeartbeatManagerOptions.HEARTBEAT_TIMEOUT;
+import static org.apache.flink.configuration.JobManagerOptions.SLOT_REQUEST_TIMEOUT;
+import static org.apache.flink.configuration.MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL;
+
+/** The central configuration holder for Flink container-based test environments. */
+public class FlinkContainersConfig {

Review Comment:
   Thanks for the detailed explanation @afedulov ! I get your point.
   
   One solution in my mind is that we remove `FlinkContainersBuilder`, and move Testcontainers-specific setters to another class such as `TestcontainersSettings`. Also I think we can rename the `FlinkContainersConfig` to `FlinkContainersSettings` to avoid confusion with the config used for `flink-conf.yaml`. Then the constructor of `FlinkContainers` would be something like:
   
   ```java
   FlinkContainers flink = new FlinkContainers()
       .withFlinkContainerSettings(FlinkContainerSettings)
       .withTestContainerSettings(TestContainerSettings);
   
   flink.start();
   ```
   
   As of the preparation steps in the original builder I think we can hide them in a non-public class like `FlinkContainersComposer`, in order to avoid making `FlinkContainers` more sizable.
   
   Also about `FlinkContainersConfig / Settings` I think getter / setter pattern would be good enough. It's just a holder of settings and introducing a builder layer might be confusing and redundant to users. 
   
   WDYT? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] afedulov commented on pull request #19856: [FLINK-27854][tests] Refactor FlinkImageBuilder and FlinkContainerBuilder

Posted by GitBox <gi...@apache.org>.
afedulov commented on PR #19856:
URL: https://github.com/apache/flink/pull/19856#issuecomment-1171279105

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] afedulov commented on a diff in pull request #19856: [FLINK-27854][tests] Refactor FlinkImageBuilder and FlinkContainerBuilder

Posted by GitBox <gi...@apache.org>.
afedulov commented on code in PR #19856:
URL: https://github.com/apache/flink/pull/19856#discussion_r908469149


##########
flink-end-to-end-tests/flink-end-to-end-tests-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/table/test/KinesisStreamsTableApiIT.java:
##########
@@ -91,17 +92,20 @@ public class KinesisStreamsTableApiIT {
                     .withNetwork(network)
                     .withNetworkAliases(INTER_CONTAINER_KINESALITE_ALIAS);
 
-    public static final FlinkContainers FLINK =
-            FlinkContainers.builder()
+    public static final TestcontainersSettings TESTCONTAINERS_SETTINGS =
+            TestcontainersSettings.builder()
                     .setEnvironmentVariable("AWS_CBOR_DISABLE", "1")
                     .setEnvironmentVariable(
                             "FLINK_ENV_JAVA_OPTS",
-                            "-Dorg.apache.flink.kinesis-streams.shaded.com.amazonaws.sdk.disableCertChecking -Daws.cborEnabled=false")
-                    .setNetwork(network)
-                    .setLogger(LOGGER)
+                            "-Dorg.apache.flink.kinesis-firehose.shaded.com.amazonaws.sdk.disableCertChecking -Daws.cborEnabled=false")

Review Comment:
   No, good catch :+1: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] PatrickRen commented on a diff in pull request #19856: [FLINK-27854][tests] Refactor FlinkImageBuilder and FlinkContainerBuilder

Posted by GitBox <gi...@apache.org>.
PatrickRen commented on code in PR #19856:
URL: https://github.com/apache/flink/pull/19856#discussion_r908016836


##########
flink-end-to-end-tests/flink-end-to-end-tests-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/table/test/KinesisStreamsTableApiIT.java:
##########
@@ -91,17 +92,20 @@ public class KinesisStreamsTableApiIT {
                     .withNetwork(network)
                     .withNetworkAliases(INTER_CONTAINER_KINESALITE_ALIAS);
 
-    public static final FlinkContainers FLINK =
-            FlinkContainers.builder()
+    public static final TestcontainersSettings TESTCONTAINERS_SETTINGS =
+            TestcontainersSettings.builder()
                     .setEnvironmentVariable("AWS_CBOR_DISABLE", "1")
                     .setEnvironmentVariable(
                             "FLINK_ENV_JAVA_OPTS",
-                            "-Dorg.apache.flink.kinesis-streams.shaded.com.amazonaws.sdk.disableCertChecking -Daws.cborEnabled=false")
-                    .setNetwork(network)
-                    .setLogger(LOGGER)
+                            "-Dorg.apache.flink.kinesis-firehose.shaded.com.amazonaws.sdk.disableCertChecking -Daws.cborEnabled=false")

Review Comment:
   Is this change expected?



##########
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkTestContainersConfigurator.java:
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.flink.container;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+
+import org.slf4j.Logger;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.images.builder.ImageFromDockerfile;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+
+/** Orchestrates configuration of Flink containers within Testcontainers framework. */
+class FlinkTestContainersConfigurator {

Review Comment:
   What about `FlinkContainersConfigurator`?



##########
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainers.java:
##########
@@ -128,9 +128,37 @@ public class FlinkContainers implements BeforeAllCallback, AfterAllCallback {
     @Nullable private RestClusterClient<StandaloneClusterId> restClusterClient;
     private boolean isStarted = false;
 
+    // TODO: update Class JavaDocs when design is fixed
+    /** The FlinkContainers builder. */
+    public static final class Builder {
+        private FlinkContainersSettings flinkContainersSettings =
+                FlinkContainersSettings.defaultConfig();
+        private TestcontainersSettings testcontainersSettings =
+                TestcontainersSettings.defaultSettings();
+
+        private Builder() {}
+
+        public Builder withFlinkContainerSettings(FlinkContainersSettings flinkContainerSettings) {

Review Comment:
   It looks like the naming of methods and classes are not aligned (withFlink**Container**Settings vs. Flink**Containers**Settings / with**Testcontainer**Settings vs. **Testcontainers**Settings)



##########
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/TestcontainersSettings.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.flink.container;
+
+import org.slf4j.Logger;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/** The type Test containers settings. */
+public class TestcontainersSettings {
+
+    private Network network;
+    private Logger logger;
+    private String baseImage;
+    private Map<String, String> envVars;
+    private Collection<GenericContainer<?>> dependingContainers;
+
+    private TestcontainersSettings(Builder builder) {
+        network = builder.network;
+        baseImage = builder.baseImage;
+        logger = builder.logger;
+        envVars = builder.envVars;
+        dependingContainers = builder.dependingContainers;
+    }
+
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    public static TestcontainersSettings defaultSettings() {
+        return builder().build();
+    }
+
+    /** {@code TestContainersSettings} builder static inner class. */
+    public static final class Builder {
+        private Network network = Network.newNetwork();
+        private String baseImage;
+        private Logger logger;
+        private final Map<String, String> envVars = new HashMap<>();
+        private Collection<GenericContainer<?>> dependingContainers = new ArrayList<>();
+
+        private Builder() {}
+
+        /** Sets environment variable to containers. */
+        public Builder setEnvironmentVariable(String key, String value) {

Review Comment:
   What about using `environmentVariable` to align with other methods? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] afedulov commented on a diff in pull request #19856: [FLINK-27854][tests] Refactor FlinkImageBuilder and FlinkContainerBuilder

Posted by GitBox <gi...@apache.org>.
afedulov commented on code in PR #19856:
URL: https://github.com/apache/flink/pull/19856#discussion_r906708437


##########
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainersConfig.java:
##########
@@ -0,0 +1,599 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.flink.container;
+
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Properties;
+import java.util.UUID;
+
+import static org.apache.flink.configuration.HeartbeatManagerOptions.HEARTBEAT_INTERVAL;
+import static org.apache.flink.configuration.HeartbeatManagerOptions.HEARTBEAT_TIMEOUT;
+import static org.apache.flink.configuration.JobManagerOptions.SLOT_REQUEST_TIMEOUT;
+import static org.apache.flink.configuration.MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL;
+
+/** The central configuration holder for Flink container-based test environments. */
+public class FlinkContainersConfig {

Review Comment:
   Hi @PatrickRen, I dropped `FlinkContainerBuilder` and introduced `TestcontainersSettings`, as you proposed.
   I would prefer to keep the builder patterns for the following reasons:
   1) It is one of the most widespread design patterns in general and we also use them everywhere in Flink. I do not think they are confusing for the users.
   2) I believe if anything, they reduce the potential confusion for the users. A user might change the settings later and expect the changes to take effect on the execution, but it won't because we already populated concrete underlying classes with it.  Immutability makes things more clear. 
   3) Inline declaration: FlinkContainers are mostly initialized as class fields and using the setter style would require every test implementation to either use static{} initialization blocks or to provide initialization methods similar to what is done with the [Flink config ](https://github.com/apache/flink/blob/89f782416bf5b600ce35e3e3e0edd4992019f2e6/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SmokeKafkaITCase.java#L99). I find it pretty convenient to be able to configure those test fields inline.
   
   Looking forward to your feedback!
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] afedulov commented on a diff in pull request #19856: [FLINK-27854][tests] Refactor FlinkImageBuilder and FlinkContainerBuilder

Posted by GitBox <gi...@apache.org>.
afedulov commented on code in PR #19856:
URL: https://github.com/apache/flink/pull/19856#discussion_r906708437


##########
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainersConfig.java:
##########
@@ -0,0 +1,599 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.flink.container;
+
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Properties;
+import java.util.UUID;
+
+import static org.apache.flink.configuration.HeartbeatManagerOptions.HEARTBEAT_INTERVAL;
+import static org.apache.flink.configuration.HeartbeatManagerOptions.HEARTBEAT_TIMEOUT;
+import static org.apache.flink.configuration.JobManagerOptions.SLOT_REQUEST_TIMEOUT;
+import static org.apache.flink.configuration.MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL;
+
+/** The central configuration holder for Flink container-based test environments. */
+public class FlinkContainersConfig {

Review Comment:
   Hi @PatrickRen, I dropped `FlinkContainerBuilder` and introduced `TestcontainersSettings`, as you proposed.
   I would prefer to keep the builder patterns for the following reasons:
   1) It is one of the most widespread design patterns in general and we also use them everywhere in Flink. I do not think they are confusing for the users.
   2) I believe if anything, they reduce the potential confusion for the users. A user might change the settings later and expect the changes to take effect on the execution, but it won't because we already populated concrete underlying classes.  Immutability makes things more clear. 
   3) Inline declaration: FlinkContainers are mostly initialized as class fields and using the setter style would require every test implementation to either use static{} initialization blocks or to provide initialization methods similar to what is done with the [Flink config ](https://github.com/apache/flink/blob/89f782416bf5b600ce35e3e3e0edd4992019f2e6/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SmokeKafkaITCase.java#L99). I find it convenient to be able to configure those test fields inline.
   
   Looking forward to your feedback!
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] afedulov commented on pull request #19856: [FLINK-27854][tests] Refactor FlinkImageBuilder and FlinkContainerBuilder

Posted by GitBox <gi...@apache.org>.
afedulov commented on PR #19856:
URL: https://github.com/apache/flink/pull/19856#issuecomment-1148713215

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] afedulov commented on pull request #19856: [FLINK-27854][tests] Refactor FlinkImageBuilder and FlinkContainerBuilder

Posted by GitBox <gi...@apache.org>.
afedulov commented on PR #19856:
URL: https://github.com/apache/flink/pull/19856#issuecomment-1154456010

   @PatrickRen could you please take a look at my comments above?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] PatrickRen commented on a diff in pull request #19856: [FLINK-27854][tests] Refactor FlinkImageBuilder and FlinkContainerBuilder

Posted by GitBox <gi...@apache.org>.
PatrickRen commented on code in PR #19856:
URL: https://github.com/apache/flink/pull/19856#discussion_r892569574


##########
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainersConfig.java:
##########
@@ -0,0 +1,599 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.flink.container;
+
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Properties;
+import java.util.UUID;
+
+import static org.apache.flink.configuration.HeartbeatManagerOptions.HEARTBEAT_INTERVAL;
+import static org.apache.flink.configuration.HeartbeatManagerOptions.HEARTBEAT_TIMEOUT;
+import static org.apache.flink.configuration.JobManagerOptions.SLOT_REQUEST_TIMEOUT;
+import static org.apache.flink.configuration.MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL;
+
+/** The central configuration holder for Flink container-based test environments. */
+public class FlinkContainersConfig {
+    private final String baseImage;
+    private final int numTaskManagers;
+    private final int numSlotsPerTaskManager;
+    private final Collection<String> jarPaths;
+    private final Configuration flinkConfiguration;
+    private final String taskManagerHostnamePrefix;
+    private final Boolean buildFromFlinkDist;
+    private final String flinkDistLocation;
+    private final String flinkHome;
+    private final String checkpointPath;
+    private final String haStoragePath;
+    private final Boolean zookeeperHA;
+    private final String zookeeperHostname;
+    private final Properties logProperties;
+
+    // Defaults
+    private static final long DEFAULT_METRIC_FETCHER_UPDATE_INTERVAL_MS = 1000L;
+    private static final long DEFAULT_SLOT_REQUEST_TIMEOUT_MS = 10_000L;
+    private static final long DEFAULT_HEARTBEAT_TIMEOUT_MS = 5_000L;
+    private static final long DEFAULT_HEARTBEAT_INTERVAL_MS = 1000L;
+    private static final MemorySize DEFAULT_TM_TOTAL_PROCESS_MEMORY = MemorySize.ofMebiBytes(1728);
+    private static final MemorySize DEFAULT_JM_TOTAL_PROCESS_MEMORY = MemorySize.ofMebiBytes(1600);
+
+    private static final int DEFAULT_NUM_TASK_MANAGERS = 1;
+    private static final int DEFAULT_NUM_SLOTS_PER_TASK_MANAGER = 1;
+    private static final String DEFAULT_TASK_MANAGERS_HOSTNAME_PREFIX = "taskmanager-";
+    private static final String DEFAULT_JOB_MANAGER_HOSTNAME = "jobmanager";
+    private static final String DEFAULT_BIND_ADDRESS = "0.0.0.0";
+    private static final String DEFAULT_FLINK_HOME = "/opt/flink";
+    private static final String DEFAULT_CHECKPOINT_PATH = DEFAULT_FLINK_HOME + "/checkpoint";
+    private static final String DEFAULT_HA_STORAGE_PATH = DEFAULT_FLINK_HOME + "/recovery";
+
+    private static final String DEFAULT_ZOOKEEPER_HOSTNAME = "zookeeper";
+    private static final String DEFAULT_ZOOKEEPER_QUORUM = DEFAULT_ZOOKEEPER_HOSTNAME + ":2181";
+    // --
+
+    private FlinkContainersConfig(Builder builder) {
+        baseImage = builder.baseImage;
+        numTaskManagers = builder.numTaskManagers;
+        numSlotsPerTaskManager = builder.numSlotsPerTaskManager;
+        jarPaths = builder.jarPaths;
+        flinkConfiguration = builder.flinkConfiguration;
+        taskManagerHostnamePrefix = builder.taskManagerHostnamePrefix;
+        buildFromFlinkDist = builder.buildFromFlinkDist;
+        flinkDistLocation = builder.flinkDistLocation;
+        flinkHome = builder.flinkHome;
+        checkpointPath = builder.checkpointPath;
+        haStoragePath = builder.haStoragePath;
+        zookeeperHA = builder.zookeeperHA;
+        zookeeperHostname = builder.zookeeperHostname;
+        logProperties = builder.logProperties;
+    }
+
+    /**
+     * Builder for {@code FlinkContainersConfig}.
+     *
+     * @return The builder.
+     */
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    /**
+     * {@code FlinkContainersConfig} based on defaults.
+     *
+     * @return The Flink containers config.
+     */
+    public static FlinkContainersConfig defaultConfig() {
+        return builder().build();
+    }
+
+    /**
+     * {@code FlinkContainersConfig} based on provided Flink configuration.
+     *
+     * @param config The config.
+     * @return The flink containers config.
+     */
+    public static FlinkContainersConfig basedOn(Configuration config) {
+        return builder().basedOn(config).build();
+    }
+
+    /** {@code FlinkContainersConfig} builder static inner class. */
+    public static final class Builder {
+        private String baseImage;
+        private int numTaskManagers = DEFAULT_NUM_TASK_MANAGERS;
+        private int numSlotsPerTaskManager = DEFAULT_NUM_SLOTS_PER_TASK_MANAGER;
+        private Collection<String> jarPaths = new ArrayList<>();
+        private Configuration flinkConfiguration = defaultFlinkConfig();
+        private String taskManagerHostnamePrefix = DEFAULT_TASK_MANAGERS_HOSTNAME_PREFIX;
+        private Boolean buildFromFlinkDist = true;
+        private String flinkDistLocation;
+        private String flinkHome = DEFAULT_FLINK_HOME;
+        private String checkpointPath = DEFAULT_CHECKPOINT_PATH;
+        private String haStoragePath = DEFAULT_HA_STORAGE_PATH;
+        private Boolean zookeeperHA = false;
+        private String zookeeperHostname = DEFAULT_ZOOKEEPER_HOSTNAME;
+        private Properties logProperties = defaultLoggingProperties();
+
+        private Builder() {}
+
+        /**
+         * Sets the {@code baseImage} and returns a reference to this Builder enabling method
+         * chaining.
+         *
+         * @param baseImage The {@code baseImage} to set.
+         * @return A reference to this Builder.
+         */
+        public Builder baseImage(String baseImage) {
+            this.baseImage = baseImage;
+            this.buildFromFlinkDist = false;
+            return this;
+        }
+
+        /**
+         * Sets the {@code flinkDistLocation} and returns a reference to this Builder enabling
+         * method chaining.
+         *
+         * @param flinkDistLocation The {@code flinkDistLocation} to set.
+         * @return A reference to this Builder.
+         */
+        public Builder flinkDistLocation(String flinkDistLocation) {

Review Comment:
   What about using `Path` as the parameter type? Same for `flinkHome`, `checkpointPath` and `haStoragePath`



##########
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkImageBuilder.java:
##########
@@ -149,15 +160,15 @@ public FlinkImageBuilder setTimeout(Duration timeout) {
     /** Use this image for building a JobManager. */
     public FlinkImageBuilder asJobManager() {
         checkStartupCommandNotSet();
-        this.startupCommand = "flink/bin/jobmanager.sh start-foreground && tail -f /dev/null";
+        this.startupCommand = "bin/jobmanager.sh start-foreground && tail -f /dev/null";

Review Comment:
   What about using absolute path here: `flinkHome + "bin/jobmanager.sh ...`? As users can pass customize base image here we are not sure where the workdir is. Same for the startup command of TaskManager.



##########
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainersConfig.java:
##########
@@ -0,0 +1,599 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.flink.container;
+
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Properties;
+import java.util.UUID;
+
+import static org.apache.flink.configuration.HeartbeatManagerOptions.HEARTBEAT_INTERVAL;
+import static org.apache.flink.configuration.HeartbeatManagerOptions.HEARTBEAT_TIMEOUT;
+import static org.apache.flink.configuration.JobManagerOptions.SLOT_REQUEST_TIMEOUT;
+import static org.apache.flink.configuration.MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL;
+
+/** The central configuration holder for Flink container-based test environments. */
+public class FlinkContainersConfig {

Review Comment:
   Actually I'm a little confused by the new `FlinkContainersConfig` 🤔 
   
   If I understand correctly, this new class is trying to unify the config across different container-based envs such as `FlinkContainers` and `FlinkContainersTestEnvironment`. This idea makes sense to me but I notice that `FlinkContainersBuilder` is kept. I think both the builder and the config are just syntax sugar to make the constructor of `FlinkContainers` and `FlinkContainersTestEnvironment` more human-readable, so is it possible to remove the `FlinkContainersBuilder` and move all paramters to `FlinkContainersConfig`?



##########
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkImageBuilder.java:
##########
@@ -80,12 +80,23 @@ public FlinkImageBuilder setTempDirectory(Path tempDirectory) {
     }
 
     /**
-     * Sets the name of building image.
+     * Sets flink home.
      *
-     * <p>If the name is not specified, {@link #DEFAULT_IMAGE_NAME} will be used.
+     * @param flinkHome The flink home.
+     * @return The flink home.
      */
-    public FlinkImageBuilder setImageName(String imageName) {
-        this.imageName = imageName;
+    public FlinkImageBuilder setFlinkHome(String flinkHome) {

Review Comment:
   Maybe use `Path` as parameter type here too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] afedulov commented on a diff in pull request #19856: [FLINK-27854][tests] Refactor FlinkImageBuilder and FlinkContainerBuilder

Posted by GitBox <gi...@apache.org>.
afedulov commented on code in PR #19856:
URL: https://github.com/apache/flink/pull/19856#discussion_r896140006


##########
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainersConfig.java:
##########
@@ -0,0 +1,599 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.flink.container;
+
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Properties;
+import java.util.UUID;
+
+import static org.apache.flink.configuration.HeartbeatManagerOptions.HEARTBEAT_INTERVAL;
+import static org.apache.flink.configuration.HeartbeatManagerOptions.HEARTBEAT_TIMEOUT;
+import static org.apache.flink.configuration.JobManagerOptions.SLOT_REQUEST_TIMEOUT;
+import static org.apache.flink.configuration.MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL;
+
+/** The central configuration holder for Flink container-based test environments. */
+public class FlinkContainersConfig {
+    private final String baseImage;
+    private final int numTaskManagers;
+    private final int numSlotsPerTaskManager;
+    private final Collection<String> jarPaths;
+    private final Configuration flinkConfiguration;
+    private final String taskManagerHostnamePrefix;
+    private final Boolean buildFromFlinkDist;
+    private final String flinkDistLocation;
+    private final String flinkHome;
+    private final String checkpointPath;
+    private final String haStoragePath;
+    private final Boolean zookeeperHA;
+    private final String zookeeperHostname;
+    private final Properties logProperties;
+
+    // Defaults
+    private static final long DEFAULT_METRIC_FETCHER_UPDATE_INTERVAL_MS = 1000L;
+    private static final long DEFAULT_SLOT_REQUEST_TIMEOUT_MS = 10_000L;
+    private static final long DEFAULT_HEARTBEAT_TIMEOUT_MS = 5_000L;
+    private static final long DEFAULT_HEARTBEAT_INTERVAL_MS = 1000L;
+    private static final MemorySize DEFAULT_TM_TOTAL_PROCESS_MEMORY = MemorySize.ofMebiBytes(1728);
+    private static final MemorySize DEFAULT_JM_TOTAL_PROCESS_MEMORY = MemorySize.ofMebiBytes(1600);
+
+    private static final int DEFAULT_NUM_TASK_MANAGERS = 1;
+    private static final int DEFAULT_NUM_SLOTS_PER_TASK_MANAGER = 1;
+    private static final String DEFAULT_TASK_MANAGERS_HOSTNAME_PREFIX = "taskmanager-";
+    private static final String DEFAULT_JOB_MANAGER_HOSTNAME = "jobmanager";
+    private static final String DEFAULT_BIND_ADDRESS = "0.0.0.0";
+    private static final String DEFAULT_FLINK_HOME = "/opt/flink";
+    private static final String DEFAULT_CHECKPOINT_PATH = DEFAULT_FLINK_HOME + "/checkpoint";
+    private static final String DEFAULT_HA_STORAGE_PATH = DEFAULT_FLINK_HOME + "/recovery";
+
+    private static final String DEFAULT_ZOOKEEPER_HOSTNAME = "zookeeper";
+    private static final String DEFAULT_ZOOKEEPER_QUORUM = DEFAULT_ZOOKEEPER_HOSTNAME + ":2181";
+    // --
+
+    private FlinkContainersConfig(Builder builder) {
+        baseImage = builder.baseImage;
+        numTaskManagers = builder.numTaskManagers;
+        numSlotsPerTaskManager = builder.numSlotsPerTaskManager;
+        jarPaths = builder.jarPaths;
+        flinkConfiguration = builder.flinkConfiguration;
+        taskManagerHostnamePrefix = builder.taskManagerHostnamePrefix;
+        buildFromFlinkDist = builder.buildFromFlinkDist;
+        flinkDistLocation = builder.flinkDistLocation;
+        flinkHome = builder.flinkHome;
+        checkpointPath = builder.checkpointPath;
+        haStoragePath = builder.haStoragePath;
+        zookeeperHA = builder.zookeeperHA;
+        zookeeperHostname = builder.zookeeperHostname;
+        logProperties = builder.logProperties;
+    }
+
+    /**
+     * Builder for {@code FlinkContainersConfig}.
+     *
+     * @return The builder.
+     */
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    /**
+     * {@code FlinkContainersConfig} based on defaults.
+     *
+     * @return The Flink containers config.
+     */
+    public static FlinkContainersConfig defaultConfig() {
+        return builder().build();
+    }
+
+    /**
+     * {@code FlinkContainersConfig} based on provided Flink configuration.
+     *
+     * @param config The config.
+     * @return The flink containers config.
+     */
+    public static FlinkContainersConfig basedOn(Configuration config) {
+        return builder().basedOn(config).build();
+    }
+
+    /** {@code FlinkContainersConfig} builder static inner class. */
+    public static final class Builder {
+        private String baseImage;
+        private int numTaskManagers = DEFAULT_NUM_TASK_MANAGERS;
+        private int numSlotsPerTaskManager = DEFAULT_NUM_SLOTS_PER_TASK_MANAGER;
+        private Collection<String> jarPaths = new ArrayList<>();
+        private Configuration flinkConfiguration = defaultFlinkConfig();
+        private String taskManagerHostnamePrefix = DEFAULT_TASK_MANAGERS_HOSTNAME_PREFIX;
+        private Boolean buildFromFlinkDist = true;
+        private String flinkDistLocation;
+        private String flinkHome = DEFAULT_FLINK_HOME;
+        private String checkpointPath = DEFAULT_CHECKPOINT_PATH;
+        private String haStoragePath = DEFAULT_HA_STORAGE_PATH;
+        private Boolean zookeeperHA = false;
+        private String zookeeperHostname = DEFAULT_ZOOKEEPER_HOSTNAME;
+        private Properties logProperties = defaultLoggingProperties();
+
+        private Builder() {}
+
+        /**
+         * Sets the {@code baseImage} and returns a reference to this Builder enabling method
+         * chaining.
+         *
+         * @param baseImage The {@code baseImage} to set.
+         * @return A reference to this Builder.
+         */
+        public Builder baseImage(String baseImage) {
+            this.baseImage = baseImage;
+            this.buildFromFlinkDist = false;
+            return this;
+        }
+
+        /**
+         * Sets the {@code flinkDistLocation} and returns a reference to this Builder enabling
+         * method chaining.
+         *
+         * @param flinkDistLocation The {@code flinkDistLocation} to set.
+         * @return A reference to this Builder.
+         */
+        public Builder flinkDistLocation(String flinkDistLocation) {

Review Comment:
   What would be the benefit of requiring users to pass Paths? My idea was to keep it simple since we know that the containers with Flink are all *nix based. In the end, what we care about is that the path is configured correctly for the container file system, however, the standard way of creating Paths ( `Paths.get()` ) uses `FileSystems.getDefault()` which makes it dependent on the target system of the user. I decided it should be less error-prone to ask for Strings with an exact and specific interpretation for this reason.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] afedulov commented on a diff in pull request #19856: [FLINK-27854][tests] Refactor FlinkImageBuilder and FlinkContainerBuilder

Posted by GitBox <gi...@apache.org>.
afedulov commented on code in PR #19856:
URL: https://github.com/apache/flink/pull/19856#discussion_r896140006


##########
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainersConfig.java:
##########
@@ -0,0 +1,599 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.flink.container;
+
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Properties;
+import java.util.UUID;
+
+import static org.apache.flink.configuration.HeartbeatManagerOptions.HEARTBEAT_INTERVAL;
+import static org.apache.flink.configuration.HeartbeatManagerOptions.HEARTBEAT_TIMEOUT;
+import static org.apache.flink.configuration.JobManagerOptions.SLOT_REQUEST_TIMEOUT;
+import static org.apache.flink.configuration.MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL;
+
+/** The central configuration holder for Flink container-based test environments. */
+public class FlinkContainersConfig {
+    private final String baseImage;
+    private final int numTaskManagers;
+    private final int numSlotsPerTaskManager;
+    private final Collection<String> jarPaths;
+    private final Configuration flinkConfiguration;
+    private final String taskManagerHostnamePrefix;
+    private final Boolean buildFromFlinkDist;
+    private final String flinkDistLocation;
+    private final String flinkHome;
+    private final String checkpointPath;
+    private final String haStoragePath;
+    private final Boolean zookeeperHA;
+    private final String zookeeperHostname;
+    private final Properties logProperties;
+
+    // Defaults
+    private static final long DEFAULT_METRIC_FETCHER_UPDATE_INTERVAL_MS = 1000L;
+    private static final long DEFAULT_SLOT_REQUEST_TIMEOUT_MS = 10_000L;
+    private static final long DEFAULT_HEARTBEAT_TIMEOUT_MS = 5_000L;
+    private static final long DEFAULT_HEARTBEAT_INTERVAL_MS = 1000L;
+    private static final MemorySize DEFAULT_TM_TOTAL_PROCESS_MEMORY = MemorySize.ofMebiBytes(1728);
+    private static final MemorySize DEFAULT_JM_TOTAL_PROCESS_MEMORY = MemorySize.ofMebiBytes(1600);
+
+    private static final int DEFAULT_NUM_TASK_MANAGERS = 1;
+    private static final int DEFAULT_NUM_SLOTS_PER_TASK_MANAGER = 1;
+    private static final String DEFAULT_TASK_MANAGERS_HOSTNAME_PREFIX = "taskmanager-";
+    private static final String DEFAULT_JOB_MANAGER_HOSTNAME = "jobmanager";
+    private static final String DEFAULT_BIND_ADDRESS = "0.0.0.0";
+    private static final String DEFAULT_FLINK_HOME = "/opt/flink";
+    private static final String DEFAULT_CHECKPOINT_PATH = DEFAULT_FLINK_HOME + "/checkpoint";
+    private static final String DEFAULT_HA_STORAGE_PATH = DEFAULT_FLINK_HOME + "/recovery";
+
+    private static final String DEFAULT_ZOOKEEPER_HOSTNAME = "zookeeper";
+    private static final String DEFAULT_ZOOKEEPER_QUORUM = DEFAULT_ZOOKEEPER_HOSTNAME + ":2181";
+    // --
+
+    private FlinkContainersConfig(Builder builder) {
+        baseImage = builder.baseImage;
+        numTaskManagers = builder.numTaskManagers;
+        numSlotsPerTaskManager = builder.numSlotsPerTaskManager;
+        jarPaths = builder.jarPaths;
+        flinkConfiguration = builder.flinkConfiguration;
+        taskManagerHostnamePrefix = builder.taskManagerHostnamePrefix;
+        buildFromFlinkDist = builder.buildFromFlinkDist;
+        flinkDistLocation = builder.flinkDistLocation;
+        flinkHome = builder.flinkHome;
+        checkpointPath = builder.checkpointPath;
+        haStoragePath = builder.haStoragePath;
+        zookeeperHA = builder.zookeeperHA;
+        zookeeperHostname = builder.zookeeperHostname;
+        logProperties = builder.logProperties;
+    }
+
+    /**
+     * Builder for {@code FlinkContainersConfig}.
+     *
+     * @return The builder.
+     */
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    /**
+     * {@code FlinkContainersConfig} based on defaults.
+     *
+     * @return The Flink containers config.
+     */
+    public static FlinkContainersConfig defaultConfig() {
+        return builder().build();
+    }
+
+    /**
+     * {@code FlinkContainersConfig} based on provided Flink configuration.
+     *
+     * @param config The config.
+     * @return The flink containers config.
+     */
+    public static FlinkContainersConfig basedOn(Configuration config) {
+        return builder().basedOn(config).build();
+    }
+
+    /** {@code FlinkContainersConfig} builder static inner class. */
+    public static final class Builder {
+        private String baseImage;
+        private int numTaskManagers = DEFAULT_NUM_TASK_MANAGERS;
+        private int numSlotsPerTaskManager = DEFAULT_NUM_SLOTS_PER_TASK_MANAGER;
+        private Collection<String> jarPaths = new ArrayList<>();
+        private Configuration flinkConfiguration = defaultFlinkConfig();
+        private String taskManagerHostnamePrefix = DEFAULT_TASK_MANAGERS_HOSTNAME_PREFIX;
+        private Boolean buildFromFlinkDist = true;
+        private String flinkDistLocation;
+        private String flinkHome = DEFAULT_FLINK_HOME;
+        private String checkpointPath = DEFAULT_CHECKPOINT_PATH;
+        private String haStoragePath = DEFAULT_HA_STORAGE_PATH;
+        private Boolean zookeeperHA = false;
+        private String zookeeperHostname = DEFAULT_ZOOKEEPER_HOSTNAME;
+        private Properties logProperties = defaultLoggingProperties();
+
+        private Builder() {}
+
+        /**
+         * Sets the {@code baseImage} and returns a reference to this Builder enabling method
+         * chaining.
+         *
+         * @param baseImage The {@code baseImage} to set.
+         * @return A reference to this Builder.
+         */
+        public Builder baseImage(String baseImage) {
+            this.baseImage = baseImage;
+            this.buildFromFlinkDist = false;
+            return this;
+        }
+
+        /**
+         * Sets the {@code flinkDistLocation} and returns a reference to this Builder enabling
+         * method chaining.
+         *
+         * @param flinkDistLocation The {@code flinkDistLocation} to set.
+         * @return A reference to this Builder.
+         */
+        public Builder flinkDistLocation(String flinkDistLocation) {

Review Comment:
   What would be the benefit of requiring users to pass Paths? My idea was to keep it simple since we know that the containers with Flink are all *nix based. In the end, what we care about is that the path is configured correctly for the container file system, however, the standard way of creating Paths ( `Paths.get()` ) uses `FileSystems.getDefault()` which makes it dependent on the target system of the user. I decided it should be less error-prone to ask for Strings with an exact interpretation for this reason.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] afedulov commented on a diff in pull request #19856: [FLINK-27854][tests] Refactor FlinkImageBuilder and FlinkContainerBuilder

Posted by GitBox <gi...@apache.org>.
afedulov commented on code in PR #19856:
URL: https://github.com/apache/flink/pull/19856#discussion_r896150381


##########
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkImageBuilder.java:
##########
@@ -149,15 +160,15 @@ public FlinkImageBuilder setTimeout(Duration timeout) {
     /** Use this image for building a JobManager. */
     public FlinkImageBuilder asJobManager() {
         checkStartupCommandNotSet();
-        this.startupCommand = "flink/bin/jobmanager.sh start-foreground && tail -f /dev/null";
+        this.startupCommand = "bin/jobmanager.sh start-foreground && tail -f /dev/null";

Review Comment:
   This is why I've added this:
   https://github.com/apache/flink/blob/88ac2f5ba12b0597fe63e92e8a0ae65a61576e99/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainersBuilder.java#L207 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] afedulov commented on a diff in pull request #19856: [FLINK-27854][tests] Refactor FlinkImageBuilder and FlinkContainerBuilder

Posted by GitBox <gi...@apache.org>.
afedulov commented on code in PR #19856:
URL: https://github.com/apache/flink/pull/19856#discussion_r908464004


##########
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkTestContainersConfigurator.java:
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.flink.container;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+
+import org.slf4j.Logger;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.images.builder.ImageFromDockerfile;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+
+/** Orchestrates configuration of Flink containers within Testcontainers framework. */
+class FlinkTestContainersConfigurator {

Review Comment:
   Not sure. It actually is specific to Testcontainers (configures GenericContainers based on TestcontainersConfiguration).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] afedulov commented on a diff in pull request #19856: [FLINK-27854][tests] Refactor FlinkImageBuilder and FlinkContainerBuilder

Posted by GitBox <gi...@apache.org>.
afedulov commented on code in PR #19856:
URL: https://github.com/apache/flink/pull/19856#discussion_r906708437


##########
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainersConfig.java:
##########
@@ -0,0 +1,599 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.flink.container;
+
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Properties;
+import java.util.UUID;
+
+import static org.apache.flink.configuration.HeartbeatManagerOptions.HEARTBEAT_INTERVAL;
+import static org.apache.flink.configuration.HeartbeatManagerOptions.HEARTBEAT_TIMEOUT;
+import static org.apache.flink.configuration.JobManagerOptions.SLOT_REQUEST_TIMEOUT;
+import static org.apache.flink.configuration.MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL;
+
+/** The central configuration holder for Flink container-based test environments. */
+public class FlinkContainersConfig {

Review Comment:
   Hi @PatrickRen, I dropped `FlinkContainerBuilder` and introduced `TestcontainersSettings`, as you proposed.
   I would prefer to keep the builder patterns for the following reasons:
   1) It is one of the most widespread design patterns in general and we also use them everywhere in Flink. I do not think they are confusing for the users.
   2) I believe if anything, they reduce the potential confusion for the users. A user might change the settings later and expect the changes for taking effect on the execution, but it won't because we already populated concrete underlying classes with it.  Immutability makes things more clear. 
   3) Inline declaration: FlinkContainers are mostly initialized as class fields and using the setter style would require every test implementation to either use static{} initialization blocks or to provide initialization methods similar to what is done with the [Flink config ](https://github.com/apache/flink/blob/89f782416bf5b600ce35e3e3e0edd4992019f2e6/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SmokeKafkaITCase.java#L99). I find it pretty convenient to be able to configure those test fields inline.
   
   Looking forward to your feedback!
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] afedulov commented on pull request #19856: [FLINK-27854][tests] Refactor FlinkImageBuilder and FlinkContainerBuilder

Posted by GitBox <gi...@apache.org>.
afedulov commented on PR #19856:
URL: https://github.com/apache/flink/pull/19856#issuecomment-1145055226

   There seem to be some flakiness  in Pulsar tests. The same commit passed in my branch CI: https://dev.azure.com/alexanderfedulov/Flink/_build/results?buildId=291&view=results


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #19856: [FLINK-27854][tests] Refactor FlinkImageBuilder and FlinkContainerBuilder

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #19856:
URL: https://github.com/apache/flink/pull/19856#issuecomment-1142324089

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "fb8f1460b2b931a2e2975596736fed2186f107f6",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "fb8f1460b2b931a2e2975596736fed2186f107f6",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * fb8f1460b2b931a2e2975596736fed2186f107f6 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] PatrickRen merged pull request #19856: [FLINK-27854][tests] Refactor FlinkImageBuilder and FlinkContainerBuilder

Posted by GitBox <gi...@apache.org>.
PatrickRen merged PR #19856:
URL: https://github.com/apache/flink/pull/19856


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org