You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/10/18 13:48:13 UTC

[GitHub] [flink] becketqin commented on a change in pull request #13606: [FLINK-19554][e2e] Implement a unified testing framework for connectors

becketqin commented on a change in pull request #13606:
URL: https://github.com/apache/flink/pull/13606#discussion_r507004615



##########
File path: flink-end-to-end-tests/flink-connector-testing-framework/flink-connector-e2e-common/src/main/java/org/apache/flink/connectors/e2e/common/utils/SourceController.java
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.e2e.common.utils;
+
+import org.apache.flink.connectors.e2e.common.source.ControllableSource;
+import org.apache.flink.connectors.e2e.common.source.SourceControlRpc;
+
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.rmi.NotBoundException;
+import java.rmi.RemoteException;
+import java.rmi.registry.LocateRegistry;
+import java.time.Duration;
+import java.util.List;
+
+public class SourceController implements SourceControlRpc {

Review comment:
       Missing Java docs.

##########
File path: flink-end-to-end-tests/flink-connector-testing-framework/flink-connector-e2e-common/src/main/java/org/apache/flink/connectors/e2e/common/source/ControllableSource.java
##########
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.e2e.common.source;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.Serializable;
+import java.rmi.RemoteException;

Review comment:
       unused import.

##########
File path: flink-end-to-end-tests/flink-connector-testing-framework/flink-connector-e2e-common/src/main/java/org/apache/flink/connectors/e2e/common/source/ControllableSource.java
##########
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.e2e.common.source;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.Serializable;
+import java.rmi.RemoteException;
+import java.rmi.registry.LocateRegistry;
+import java.rmi.registry.Registry;
+import java.rmi.server.UnicastRemoteObject;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A Flink source that can be controlled by Java RMI.
+ *
+ * <p>This source is used for generating random records to downstream and recording records into a given local file.
+ * In order to control records to generate and lifecycle of the Flink job using this source, it integrates with
+ * an Java RMI server so that it could be controlled remotely through RPC (basically controlled by testing framework).
+ * </p>
+ */
+public class ControllableSource
+		extends AbstractRichFunction
+		implements SourceFunction<String>, CheckpointedFunction, SourceControlRpc {
+
+	private static final Logger LOG = LoggerFactory.getLogger(ControllableSource.class);
+
+	/*---------------- Java RMI configurations ---------------*/
+	public static final String RMI_PROP_REGISTRY_FILTER = "sun.rmi.registry.registryFilter";
+	public static final String RMI_PROP_SERVER_HOSTNAME = "java.rmi.server.hostname";
+	public static final String RMI_REGISTRY_NAME = "SourceControl";
+	public static final String RMI_HOSTNAME = "127.0.0.1";
+	public static final int RMI_PORT = 15213;
+	private Registry rmiRegistry;
+
+	private volatile boolean isRunning = true;
+	private volatile boolean isStepping = true;
+
+	private final SyncLock syncLock = new SyncLock();
+
+	private final File recordingFile;
+	private BufferedWriter br;
+
+	private AtomicInteger numElementsToEmit;
+
+	private final String endMark;
+
+	public ControllableSource(String recordingFilePath, String endMark) {
+		recordingFile = new File(recordingFilePath);
+		this.endMark = endMark;
+	}
+
+	/*------------------- Checkpoint related---------------------*/
+	@Override
+	public void snapshotState(FunctionSnapshotContext context) {
+
+	}
+
+	@Override
+	public void initializeState(FunctionInitializationContext context) {
+
+	}
+
+
+	/*-------------------- Rich function related -----------------*/
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		// Setup Java RMI
+		java.lang.System.setProperty(RMI_PROP_REGISTRY_FILTER, "java.**;org.apache.flink.**");
+		java.lang.System.setProperty(RMI_PROP_SERVER_HOSTNAME, RMI_HOSTNAME);
+		UnicastRemoteObject.exportObject(this, RMI_PORT);
+		rmiRegistry = LocateRegistry.createRegistry(RMI_PORT);
+		rmiRegistry.bind(RMI_REGISTRY_NAME, this);
+
+		// Setup recording file
+		br = new BufferedWriter(new FileWriter(recordingFile));
+
+		// Setup record counter
+		numElementsToEmit = new AtomicInteger(0);
+	}
+
+	@Override
+	public void run(SourceContext<String> ctx) throws Exception {
+
+		// Main loop
+		while (isRunning) {
+			if (isStepping) {
+				// Step run
+				while (numElementsToEmit.get() > 0) {
+					emitAndRecordElement(ctx, generateRandomString(20));
+					numElementsToEmit.decrementAndGet();
+				}
+				// Double check the status before sleeping
+				if (isStepping) {
+					synchronized (syncLock) {
+						syncLock.wait();
+					}
+				}
+			} else {
+				// Continuous run
+				emitAndRecordElement(ctx, generateRandomString(10));
+			}
+		}
+
+		// Ready to finish the job
+		// Finish leftover
+		while (numElementsToEmit.get() > 0) {
+			emitAndRecordElement(ctx, generateRandomString(20));
+			numElementsToEmit.decrementAndGet();
+		}
+
+		// Emit end mark before exit
+		ctx.collect(endMark);
+	}
+
+	@Override
+	public void cancel() {
+		synchronized (syncLock) {
+			isRunning = false;
+			syncLock.notify();
+		}
+	}
+
+	@Override
+	public void close() throws Exception {
+		br.close();
+		UnicastRemoteObject.unexportObject(rmiRegistry, true);
+	}
+
+	/*------------------------- Java RMI related ----------------------*/
+	@Override
+	public void pause() {
+		LOG.info("Received command PAUSE");
+		isStepping = true;
+	}
+
+	@Override
+	public void next() {
+		LOG.info("Received command NEXT");
+		// if main thread is running, just ignore the request
+		if (!isStepping) {
+			return;
+		}
+		numElementsToEmit.incrementAndGet();
+		// Main thread maybe sleeping, wake it up
+		synchronized (syncLock) {
+			syncLock.notify();
+		}
+	}
+
+	@Override
+	public void go() {
+		LOG.info("Received command GO");
+		synchronized (syncLock) {
+			isStepping = false;
+			syncLock.notify();
+		}
+	}
+
+	@Override
+	public void finish() {
+		LOG.info("Received command FINISH");
+		synchronized (syncLock) {
+			isRunning = false;
+			syncLock.notify();
+		}
+	}
+
+	static class SyncLock implements Serializable {

Review comment:
       Any reason that we define a new lock class here?

##########
File path: flink-end-to-end-tests/flink-connector-testing-framework/pom.xml
##########
@@ -0,0 +1,108 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>org.apache.flink</groupId>
+    <artifactId>flink-connector-testing-framework</artifactId>
+    <packaging>pom</packaging>
+    <version>0.1-SNAPSHOT</version>

Review comment:
       The version should be 1.12-SNAPSHOT.

##########
File path: flink-end-to-end-tests/flink-connector-testing-framework/flink-connector-e2e-common/src/main/java/org/apache/flink/connectors/e2e/common/utils/SourceController.java
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.e2e.common.utils;
+
+import org.apache.flink.connectors.e2e.common.source.ControllableSource;
+import org.apache.flink.connectors.e2e.common.source.SourceControlRpc;
+
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.rmi.NotBoundException;
+import java.rmi.RemoteException;
+import java.rmi.registry.LocateRegistry;
+import java.time.Duration;
+import java.util.List;
+
+public class SourceController implements SourceControlRpc {
+
+	private static final Logger LOG = LoggerFactory.getLogger(SourceController.class);
+
+	private SourceControlRpc stub;
+	private List<Integer> potentialPorts;
+	private boolean connected;
+
+	public SourceController(List<Integer> potentialPorts) {
+		this(ControllableSource.RMI_HOSTNAME, potentialPorts);
+	}
+
+	public SourceController(String host, List<Integer> potentialPorts) {

Review comment:
       host is not used here.

##########
File path: flink-end-to-end-tests/flink-connector-testing-framework/flink-connector-e2e-common/pom.xml
##########
@@ -0,0 +1,83 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>flink-connector-testing-framework</artifactId>
+        <groupId>org.apache.flink</groupId>
+        <version>0.1-SNAPSHOT</version>

Review comment:
       This should follow the version of the project, which is 1.12-SNAPSHOT at this point.

##########
File path: flink-end-to-end-tests/flink-connector-testing-framework/flink-connector-e2e-common/src/main/java/org/apache/flink/connectors/e2e/common/source/ControllableSource.java
##########
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.e2e.common.source;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.Serializable;
+import java.rmi.RemoteException;
+import java.rmi.registry.LocateRegistry;
+import java.rmi.registry.Registry;
+import java.rmi.server.UnicastRemoteObject;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A Flink source that can be controlled by Java RMI.
+ *
+ * <p>This source is used for generating random records to downstream and recording records into a given local file.
+ * In order to control records to generate and lifecycle of the Flink job using this source, it integrates with
+ * an Java RMI server so that it could be controlled remotely through RPC (basically controlled by testing framework).
+ * </p>
+ */
+public class ControllableSource
+		extends AbstractRichFunction
+		implements SourceFunction<String>, CheckpointedFunction, SourceControlRpc {
+
+	private static final Logger LOG = LoggerFactory.getLogger(ControllableSource.class);
+
+	/*---------------- Java RMI configurations ---------------*/
+	public static final String RMI_PROP_REGISTRY_FILTER = "sun.rmi.registry.registryFilter";
+	public static final String RMI_PROP_SERVER_HOSTNAME = "java.rmi.server.hostname";
+	public static final String RMI_REGISTRY_NAME = "SourceControl";
+	public static final String RMI_HOSTNAME = "127.0.0.1";
+	public static final int RMI_PORT = 15213;

Review comment:
       Will there be a conflict if there are multiple testing processes running in the same host?

##########
File path: flink-end-to-end-tests/flink-connector-testing-framework/flink-connector-e2e-common/src/main/java/org/apache/flink/connectors/e2e/common/TestContext.java
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.e2e.common;
+
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+import java.io.Serializable;
+
+/**
+ * Context of the test.
+ *
+ * <p>User need to provide some instances and information of the test to testing framework, including name of Flink
+ * jobs, instance of tested source/sink and job termination pattern.</p>
+ *
+ * @param <T> Type of elements after deserialization by source and before serialization by sink
+ */
+public interface TestContext<T> extends Serializable {
+
+	String jobName();

Review comment:
       Can we add Java doc for these methods?

##########
File path: flink-end-to-end-tests/flink-connector-testing-framework/flink-connector-e2e-kafka/pom.xml
##########
@@ -0,0 +1,130 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>flink-connector-testing-framework</artifactId>
+        <groupId>org.apache.flink</groupId>
+        <version>0.1-SNAPSHOT</version>

Review comment:
       The version here should be 1.12-SNAPSHOT.

##########
File path: flink-end-to-end-tests/flink-connector-testing-framework/flink-connector-e2e-common/src/main/java/org/apache/flink/connectors/e2e/common/source/ControllableSource.java
##########
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.e2e.common.source;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.Serializable;
+import java.rmi.RemoteException;
+import java.rmi.registry.LocateRegistry;
+import java.rmi.registry.Registry;
+import java.rmi.server.UnicastRemoteObject;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A Flink source that can be controlled by Java RMI.
+ *
+ * <p>This source is used for generating random records to downstream and recording records into a given local file.
+ * In order to control records to generate and lifecycle of the Flink job using this source, it integrates with
+ * an Java RMI server so that it could be controlled remotely through RPC (basically controlled by testing framework).
+ * </p>
+ */
+public class ControllableSource
+		extends AbstractRichFunction
+		implements SourceFunction<String>, CheckpointedFunction, SourceControlRpc {
+
+	private static final Logger LOG = LoggerFactory.getLogger(ControllableSource.class);
+
+	/*---------------- Java RMI configurations ---------------*/
+	public static final String RMI_PROP_REGISTRY_FILTER = "sun.rmi.registry.registryFilter";
+	public static final String RMI_PROP_SERVER_HOSTNAME = "java.rmi.server.hostname";
+	public static final String RMI_REGISTRY_NAME = "SourceControl";
+	public static final String RMI_HOSTNAME = "127.0.0.1";
+	public static final int RMI_PORT = 15213;
+	private Registry rmiRegistry;
+
+	private volatile boolean isRunning = true;
+	private volatile boolean isStepping = true;
+
+	private final SyncLock syncLock = new SyncLock();
+
+	private final File recordingFile;
+	private BufferedWriter br;
+
+	private AtomicInteger numElementsToEmit;
+
+	private final String endMark;
+
+	public ControllableSource(String recordingFilePath, String endMark) {
+		recordingFile = new File(recordingFilePath);
+		this.endMark = endMark;
+	}
+
+	/*------------------- Checkpoint related---------------------*/
+	@Override
+	public void snapshotState(FunctionSnapshotContext context) {
+
+	}
+
+	@Override
+	public void initializeState(FunctionInitializationContext context) {
+
+	}
+
+
+	/*-------------------- Rich function related -----------------*/
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		// Setup Java RMI
+		java.lang.System.setProperty(RMI_PROP_REGISTRY_FILTER, "java.**;org.apache.flink.**");
+		java.lang.System.setProperty(RMI_PROP_SERVER_HOSTNAME, RMI_HOSTNAME);
+		UnicastRemoteObject.exportObject(this, RMI_PORT);
+		rmiRegistry = LocateRegistry.createRegistry(RMI_PORT);
+		rmiRegistry.bind(RMI_REGISTRY_NAME, this);
+
+		// Setup recording file
+		br = new BufferedWriter(new FileWriter(recordingFile));
+
+		// Setup record counter
+		numElementsToEmit = new AtomicInteger(0);
+	}
+
+	@Override
+	public void run(SourceContext<String> ctx) throws Exception {
+
+		// Main loop
+		while (isRunning) {
+			if (isStepping) {
+				// Step run
+				while (numElementsToEmit.get() > 0) {
+					emitAndRecordElement(ctx, generateRandomString(20));
+					numElementsToEmit.decrementAndGet();
+				}
+				// Double check the status before sleeping
+				if (isStepping) {
+					synchronized (syncLock) {
+						syncLock.wait();
+					}

Review comment:
       This synchronization between the source and external controller seems fragile. For example, what if someone called `cancel()` before this thread goes into the wait? Will this thread miss that `cancel()` notification and potentially wait forever?

##########
File path: flink-end-to-end-tests/flink-connector-testing-framework/flink-connector-e2e-kafka/src/test/resources/log4j2.properties
##########
@@ -0,0 +1,24 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+rootLogger.level=INFO

Review comment:
       Setting the default logging level to INFO may make the log too verbose. Can we change this to ERROR or only turn on the INFO logging for some specific loggers?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org