You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2020/12/30 15:52:01 UTC

[flink] branch release-1.11 updated (d400a9b -> 65903ce)

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from d400a9b  [FLINK-20803][build][docs] Pin google-java-format version in install instructions
     new bc053b01 [FLINK-20805][table][build] Apply spotless formatting
     new 65903ce  [FLINK-20805][table][build] Remove spotless exclusion for "generated" files

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .git-blame-ignore-revs                             |   1 +
 .../runtime/generated/AggsHandleFunction.java      |  11 +-
 .../runtime/generated/AggsHandleFunctionBase.java  | 107 ++++++++--------
 .../table/runtime/generated/CompileUtils.java      | 136 +++++++++++----------
 .../generated/GeneratedAggsHandleFunction.java     |  12 +-
 .../table/runtime/generated/GeneratedClass.java    | 119 +++++++++---------
 .../runtime/generated/GeneratedCollector.java      |  22 ++--
 .../table/runtime/generated/GeneratedFunction.java |  22 ++--
 .../runtime/generated/GeneratedHashFunction.java   |  26 ++--
 .../table/runtime/generated/GeneratedInput.java    |  22 ++--
 .../runtime/generated/GeneratedJoinCondition.java  |  26 ++--
 .../GeneratedNamespaceAggsHandleFunction.java      |  15 ++-
 .../GeneratedNamespaceTableAggsHandleFunction.java |  15 ++-
 .../generated/GeneratedNormalizedKeyComputer.java  |  24 ++--
 .../table/runtime/generated/GeneratedOperator.java |  22 ++--
 .../runtime/generated/GeneratedProjection.java     |  26 ++--
 .../generated/GeneratedRecordComparator.java       |  27 ++--
 .../generated/GeneratedRecordEqualiser.java        |  26 ++--
 .../runtime/generated/GeneratedResultFuture.java   |  22 ++--
 .../GeneratedTableAggsHandleFunction.java          |  12 +-
 .../generated/GeneratedWatermarkGenerator.java     |  12 +-
 .../table/runtime/generated/HashFunction.java      |   6 +-
 .../table/runtime/generated/JoinCondition.java     |  11 +-
 .../generated/NamespaceAggsHandleFunction.java     |  20 ++-
 .../generated/NamespaceAggsHandleFunctionBase.java |  97 +++++++--------
 .../NamespaceTableAggsHandleFunction.java          |  22 ++--
 .../runtime/generated/NormalizedKeyComputer.java   |  50 +++-----
 .../flink/table/runtime/generated/Projection.java  |   7 +-
 .../table/runtime/generated/RecordComparator.java  |   9 +-
 .../table/runtime/generated/RecordEqualiser.java   |   7 +-
 .../runtime/generated/TableAggsHandleFunction.java |  16 +--
 .../runtime/generated/WatermarkGenerator.java      |  22 ++--
 .../table/runtime/generated/CompileUtilsTest.java  |  65 ++++------
 .../generated/GeneratedCollectorWrapper.java       |  43 +++----
 .../generated/GeneratedFunctionWrapper.java        |  43 +++----
 .../generated/GeneratedResultFutureWrapper.java    |  42 ++++---
 pom.xml                                            |   2 +-
 37 files changed, 554 insertions(+), 613 deletions(-)


[flink] 02/02: [FLINK-20805][table][build] Remove spotless exclusion for "generated" files

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 65903cecc28baeec25416be699157b93950035cf
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Wed Dec 30 16:51:19 2020 +0100

    [FLINK-20805][table][build] Remove spotless exclusion for "generated" files
---
 .git-blame-ignore-revs | 1 +
 pom.xml                | 2 +-
 2 files changed, 2 insertions(+), 1 deletion(-)

diff --git a/.git-blame-ignore-revs b/.git-blame-ignore-revs
index feb1c41..51db801 100644
--- a/.git-blame-ignore-revs
+++ b/.git-blame-ignore-revs
@@ -1 +1,2 @@
 3cf402c6759173b0fe48de1567a6cf58000ead72
+bc053b013a94926f25c91a6f09dbaaa3fad0003b
diff --git a/pom.xml b/pom.xml
index f9641bb..22a7c8f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1821,7 +1821,7 @@ under the License.
 							<removeUnusedImports />
 
 							<excludes>
-								<exclude>**/generated/*.java</exclude>
+								<exclude>**/avro/generated/*.java</exclude>
 								<exclude>**/InnerPayLoadAvro.java</exclude>
 								<exclude>**/ComplexPayloadAvro.java</exclude>
 								<exclude>**/example/avro/*.java</exclude>


[flink] 01/02: [FLINK-20805][table][build] Apply spotless formatting

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bc053b013a94926f25c91a6f09dbaaa3fad0003b
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Wed Dec 30 16:50:24 2020 +0100

    [FLINK-20805][table][build] Apply spotless formatting
---
 .../runtime/generated/AggsHandleFunction.java      |  11 +-
 .../runtime/generated/AggsHandleFunctionBase.java  | 107 ++++++++--------
 .../table/runtime/generated/CompileUtils.java      | 136 +++++++++++----------
 .../generated/GeneratedAggsHandleFunction.java     |  12 +-
 .../table/runtime/generated/GeneratedClass.java    | 119 +++++++++---------
 .../runtime/generated/GeneratedCollector.java      |  22 ++--
 .../table/runtime/generated/GeneratedFunction.java |  22 ++--
 .../runtime/generated/GeneratedHashFunction.java   |  26 ++--
 .../table/runtime/generated/GeneratedInput.java    |  22 ++--
 .../runtime/generated/GeneratedJoinCondition.java  |  26 ++--
 .../GeneratedNamespaceAggsHandleFunction.java      |  15 ++-
 .../GeneratedNamespaceTableAggsHandleFunction.java |  15 ++-
 .../generated/GeneratedNormalizedKeyComputer.java  |  24 ++--
 .../table/runtime/generated/GeneratedOperator.java |  22 ++--
 .../runtime/generated/GeneratedProjection.java     |  26 ++--
 .../generated/GeneratedRecordComparator.java       |  27 ++--
 .../generated/GeneratedRecordEqualiser.java        |  26 ++--
 .../runtime/generated/GeneratedResultFuture.java   |  22 ++--
 .../GeneratedTableAggsHandleFunction.java          |  12 +-
 .../generated/GeneratedWatermarkGenerator.java     |  12 +-
 .../table/runtime/generated/HashFunction.java      |   6 +-
 .../table/runtime/generated/JoinCondition.java     |  11 +-
 .../generated/NamespaceAggsHandleFunction.java     |  20 ++-
 .../generated/NamespaceAggsHandleFunctionBase.java |  97 +++++++--------
 .../NamespaceTableAggsHandleFunction.java          |  22 ++--
 .../runtime/generated/NormalizedKeyComputer.java   |  50 +++-----
 .../flink/table/runtime/generated/Projection.java  |   7 +-
 .../table/runtime/generated/RecordComparator.java  |   9 +-
 .../table/runtime/generated/RecordEqualiser.java   |   7 +-
 .../runtime/generated/TableAggsHandleFunction.java |  16 +--
 .../runtime/generated/WatermarkGenerator.java      |  22 ++--
 .../table/runtime/generated/CompileUtilsTest.java  |  65 ++++------
 .../generated/GeneratedCollectorWrapper.java       |  43 +++----
 .../generated/GeneratedFunctionWrapper.java        |  43 +++----
 .../generated/GeneratedResultFutureWrapper.java    |  42 ++++---
 35 files changed, 552 insertions(+), 612 deletions(-)

diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/AggsHandleFunction.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/AggsHandleFunction.java
index 8bbe4c3..43339d8 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/AggsHandleFunction.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/AggsHandleFunction.java
@@ -30,9 +30,10 @@ import org.apache.flink.table.functions.AggregateFunction;
  */
 public interface AggsHandleFunction extends AggsHandleFunctionBase {
 
-	/**
-	 * Gets the result of the aggregation from the current accumulators.
-	 * @return the final result (saved in a row) of the current accumulators.
-	 */
-	RowData getValue() throws Exception;
+    /**
+     * Gets the result of the aggregation from the current accumulators.
+     *
+     * @return the final result (saved in a row) of the current accumulators.
+     */
+    RowData getValue() throws Exception;
 }
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/AggsHandleFunctionBase.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/AggsHandleFunctionBase.java
index d2f5ae4..471af5a 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/AggsHandleFunctionBase.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/AggsHandleFunctionBase.java
@@ -27,74 +27,71 @@ import org.apache.flink.table.runtime.dataview.StateDataViewStore;
 /**
  * The base class for handling aggregate or table aggregate functions.
  *
- * <p>It is code generated to handle all {@link AggregateFunction}s and
- * {@link TableAggregateFunction}s together in an aggregation.
+ * <p>It is code generated to handle all {@link AggregateFunction}s and {@link
+ * TableAggregateFunction}s together in an aggregation.
  *
  * <p>It is the entry point for aggregate operators to operate all {@link AggregateFunction}s and
  * {@link TableAggregateFunction}s.
  */
 public interface AggsHandleFunctionBase extends Function {
 
-	/**
-	 * Initialization method for the function. It is called before the actual working methods.
-	 */
-	void open(StateDataViewStore store) throws Exception;
+    /** Initialization method for the function. It is called before the actual working methods. */
+    void open(StateDataViewStore store) throws Exception;
 
-	/**
-	 * Accumulates the input values to the accumulators.
-	 * @param input input values bundled in a row
-	 */
-	void accumulate(RowData input) throws Exception;
+    /**
+     * Accumulates the input values to the accumulators.
+     *
+     * @param input input values bundled in a row
+     */
+    void accumulate(RowData input) throws Exception;
 
-	/**
-	 * Retracts the input values from the accumulators.
-	 * @param input input values bundled in a row
-	 */
-	void retract(RowData input) throws Exception;
+    /**
+     * Retracts the input values from the accumulators.
+     *
+     * @param input input values bundled in a row
+     */
+    void retract(RowData input) throws Exception;
 
-	/**
-	 * Merges the other accumulators into current accumulators.
-	 *
-	 * @param accumulators The other row of accumulators
-	 */
-	void merge(RowData accumulators) throws Exception;
+    /**
+     * Merges the other accumulators into current accumulators.
+     *
+     * @param accumulators The other row of accumulators
+     */
+    void merge(RowData accumulators) throws Exception;
 
-	/**
-	 * Set the current accumulators (saved in a row) which contains the current aggregated results.
-	 * In streaming: accumulators are store in the state, we need to restore aggregate buffers from state.
-	 * In batch: accumulators are store in the hashMap, we need to restore aggregate buffers from hashMap.
-	 *
-	 * @param accumulators current accumulators
-	 */
-	void setAccumulators(RowData accumulators) throws Exception;
+    /**
+     * Set the current accumulators (saved in a row) which contains the current aggregated results.
+     * In streaming: accumulators are store in the state, we need to restore aggregate buffers from
+     * state. In batch: accumulators are store in the hashMap, we need to restore aggregate buffers
+     * from hashMap.
+     *
+     * @param accumulators current accumulators
+     */
+    void setAccumulators(RowData accumulators) throws Exception;
 
-	/**
-	 * Resets all the accumulators.
-	 */
-	void resetAccumulators() throws Exception;
+    /** Resets all the accumulators. */
+    void resetAccumulators() throws Exception;
 
-	/**
-	 * Gets the current accumulators (saved in a row) which contains the current
-	 * aggregated results.
-	 * @return the current accumulators
-	 */
-	RowData getAccumulators() throws Exception;
+    /**
+     * Gets the current accumulators (saved in a row) which contains the current aggregated results.
+     *
+     * @return the current accumulators
+     */
+    RowData getAccumulators() throws Exception;
 
-	/**
-	 * Initializes the accumulators and save them to a accumulators row.
-	 *
-	 * @return a row of accumulators which contains the aggregated results
-	 */
-	RowData createAccumulators() throws Exception;
+    /**
+     * Initializes the accumulators and save them to a accumulators row.
+     *
+     * @return a row of accumulators which contains the aggregated results
+     */
+    RowData createAccumulators() throws Exception;
 
-	/**
-	 * Cleanup for the retired accumulators state.
-	 */
-	void cleanup() throws Exception;
+    /** Cleanup for the retired accumulators state. */
+    void cleanup() throws Exception;
 
-	/**
-	 * Tear-down method for this function. It can be used for clean up work.
-	 * By default, this method does nothing.
-	 */
-	void close() throws Exception;
+    /**
+     * Tear-down method for this function. It can be used for clean up work. By default, this method
+     * does nothing.
+     */
+    void close() throws Exception;
 }
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/CompileUtils.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/CompileUtils.java
index e94c19e..f06df82 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/CompileUtils.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/CompileUtils.java
@@ -30,75 +30,81 @@ import org.slf4j.LoggerFactory;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
-/**
- * Utilities to compile a generated code to a Class.
- */
+/** Utilities to compile a generated code to a Class. */
 public final class CompileUtils {
 
-	// used for logging the generated codes to a same place
-	private static final Logger CODE_LOG = LoggerFactory.getLogger(CompileUtils.class);
+    // used for logging the generated codes to a same place
+    private static final Logger CODE_LOG = LoggerFactory.getLogger(CompileUtils.class);
 
-	/**
-	 * Cache of compile, Janino generates a new Class Loader and a new Class file every compile
-	 * (guaranteeing that the class name will not be repeated). This leads to multiple tasks of
-	 * the same process that generate a large number of duplicate class, resulting in a large
-	 * number of Meta zone GC (class unloading), resulting in performance bottlenecks. So we add
-	 * a cache to avoid this problem.
-	 */
-	protected static final Cache<String, Cache<ClassLoader, Class>> COMPILED_CACHE = CacheBuilder
-		.newBuilder()
-		.maximumSize(100)   // estimated cache size
-		.build();
+    /**
+     * Cache of compile, Janino generates a new Class Loader and a new Class file every compile
+     * (guaranteeing that the class name will not be repeated). This leads to multiple tasks of the
+     * same process that generate a large number of duplicate class, resulting in a large number of
+     * Meta zone GC (class unloading), resulting in performance bottlenecks. So we add a cache to
+     * avoid this problem.
+     */
+    protected static final Cache<String, Cache<ClassLoader, Class>> COMPILED_CACHE =
+            CacheBuilder.newBuilder()
+                    .maximumSize(100) // estimated cache size
+                    .build();
 
-	/**
-	 * Compiles a generated code to a Class.
-	 * @param cl the ClassLoader used to load the class
-	 * @param name  the class name
-	 * @param code  the generated code
-	 * @param <T>   the class type
-	 * @return  the compiled class
-	 */
-	@SuppressWarnings("unchecked")
-	public static <T> Class<T> compile(ClassLoader cl, String name, String code) {
-		try {
-			Cache<ClassLoader, Class> compiledClasses = COMPILED_CACHE.get(name,
-					() -> CacheBuilder.newBuilder().maximumSize(5).weakKeys().softValues().build());
-			return compiledClasses.get(cl, () -> doCompile(cl, name, code));
-		} catch (Exception e) {
-			throw new FlinkRuntimeException(e.getMessage(), e);
-		}
-	}
+    /**
+     * Compiles a generated code to a Class.
+     *
+     * @param cl the ClassLoader used to load the class
+     * @param name the class name
+     * @param code the generated code
+     * @param <T> the class type
+     * @return the compiled class
+     */
+    @SuppressWarnings("unchecked")
+    public static <T> Class<T> compile(ClassLoader cl, String name, String code) {
+        try {
+            Cache<ClassLoader, Class> compiledClasses =
+                    COMPILED_CACHE.get(
+                            name,
+                            () ->
+                                    CacheBuilder.newBuilder()
+                                            .maximumSize(5)
+                                            .weakKeys()
+                                            .softValues()
+                                            .build());
+            return compiledClasses.get(cl, () -> doCompile(cl, name, code));
+        } catch (Exception e) {
+            throw new FlinkRuntimeException(e.getMessage(), e);
+        }
+    }
 
-	private static <T> Class<T> doCompile(ClassLoader cl, String name, String code) {
-		checkNotNull(cl, "Classloader must not be null.");
-		CODE_LOG.debug("Compiling: {} \n\n Code:\n{}", name, code);
-		SimpleCompiler compiler = new SimpleCompiler();
-		compiler.setParentClassLoader(cl);
-		try {
-			compiler.cook(code);
-		} catch (Throwable t) {
-			System.out.println(addLineNumber(code));
-			throw new InvalidProgramException(
-				"Table program cannot be compiled. This is a bug. Please file an issue.", t);
-		}
-		try {
-			//noinspection unchecked
-			return (Class<T>) compiler.getClassLoader().loadClass(name);
-		} catch (ClassNotFoundException e) {
-			throw new RuntimeException("Can not load class " + name, e);
-		}
-	}
+    private static <T> Class<T> doCompile(ClassLoader cl, String name, String code) {
+        checkNotNull(cl, "Classloader must not be null.");
+        CODE_LOG.debug("Compiling: {} \n\n Code:\n{}", name, code);
+        SimpleCompiler compiler = new SimpleCompiler();
+        compiler.setParentClassLoader(cl);
+        try {
+            compiler.cook(code);
+        } catch (Throwable t) {
+            System.out.println(addLineNumber(code));
+            throw new InvalidProgramException(
+                    "Table program cannot be compiled. This is a bug. Please file an issue.", t);
+        }
+        try {
+            //noinspection unchecked
+            return (Class<T>) compiler.getClassLoader().loadClass(name);
+        } catch (ClassNotFoundException e) {
+            throw new RuntimeException("Can not load class " + name, e);
+        }
+    }
 
-	/**
-	 * To output more information when an error occurs.
-	 * Generally, when cook fails, it shows which line is wrong. This line number starts at 1.
-	 */
-	private static String addLineNumber(String code) {
-		String[] lines = code.split("\n");
-		StringBuilder builder = new StringBuilder();
-		for (int i = 0; i < lines.length; i++) {
-			builder.append("/* ").append(i + 1).append(" */").append(lines[i]).append("\n");
-		}
-		return builder.toString();
-	}
+    /**
+     * To output more information when an error occurs. Generally, when cook fails, it shows which
+     * line is wrong. This line number starts at 1.
+     */
+    private static String addLineNumber(String code) {
+        String[] lines = code.split("\n");
+        StringBuilder builder = new StringBuilder();
+        for (int i = 0; i < lines.length; i++) {
+            builder.append("/* ").append(i + 1).append(" */").append(lines[i]).append("\n");
+        }
+        return builder.toString();
+    }
 }
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/GeneratedAggsHandleFunction.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/GeneratedAggsHandleFunction.java
index 2839809..45a58a2 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/GeneratedAggsHandleFunction.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/GeneratedAggsHandleFunction.java
@@ -18,14 +18,12 @@
 
 package org.apache.flink.table.runtime.generated;
 
-/**
- * Describes a generated {@link AggsHandleFunction}.
- */
+/** Describes a generated {@link AggsHandleFunction}. */
 public class GeneratedAggsHandleFunction extends GeneratedClass<AggsHandleFunction> {
 
-	private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 1L;
 
-	public GeneratedAggsHandleFunction(String className, String code, Object[] references) {
-		super(className, code, references);
-	}
+    public GeneratedAggsHandleFunction(String className, String code, Object[] references) {
+        super(className, code, references);
+    }
 }
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/GeneratedClass.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/GeneratedClass.java
index 4822385..039a3e0 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/GeneratedClass.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/GeneratedClass.java
@@ -23,76 +23,75 @@ import java.io.Serializable;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * A wrapper for generated class, defines a {@link #newInstance(ClassLoader)} method
- * to get an instance by reference objects easily.
+ * A wrapper for generated class, defines a {@link #newInstance(ClassLoader)} method to get an
+ * instance by reference objects easily.
  */
 public abstract class GeneratedClass<T> implements Serializable {
 
-	private final String className;
-	private final String code;
-	private final Object[] references;
+    private final String className;
+    private final String code;
+    private final Object[] references;
 
-	private transient Class<T> compiledClass;
+    private transient Class<T> compiledClass;
 
-	protected GeneratedClass(String className, String code, Object[] references) {
-		checkNotNull(className, "name must not be null");
-		checkNotNull(code, "code must not be null");
-		checkNotNull(references, "references must not be null");
-		this.className = className;
-		this.code = code;
-		this.references = references;
-	}
+    protected GeneratedClass(String className, String code, Object[] references) {
+        checkNotNull(className, "name must not be null");
+        checkNotNull(code, "code must not be null");
+        checkNotNull(references, "references must not be null");
+        this.className = className;
+        this.code = code;
+        this.references = references;
+    }
 
-	/**
-	 * Create a new instance of this generated class.
-	 */
-	@SuppressWarnings("unchecked")
-	public T newInstance(ClassLoader classLoader) {
-		try {
-			return compile(classLoader).getConstructor(Object[].class)
-					// Because Constructor.newInstance(Object... initargs), we need to load
-					// references into a new Object[], otherwise it cannot be compiled.
-					.newInstance(new Object[] {references});
-		} catch (Exception e) {
-			throw new RuntimeException(
-				"Could not instantiate generated class '" + className + "'", e);
-		}
-	}
+    /** Create a new instance of this generated class. */
+    @SuppressWarnings("unchecked")
+    public T newInstance(ClassLoader classLoader) {
+        try {
+            return compile(classLoader)
+                    .getConstructor(Object[].class)
+                    // Because Constructor.newInstance(Object... initargs), we need to load
+                    // references into a new Object[], otherwise it cannot be compiled.
+                    .newInstance(new Object[] {references});
+        } catch (Exception e) {
+            throw new RuntimeException(
+                    "Could not instantiate generated class '" + className + "'", e);
+        }
+    }
 
-	@SuppressWarnings("unchecked")
-	public T newInstance(ClassLoader classLoader, Object... args) {
-		try {
-			return (T) compile(classLoader).getConstructors()[0].newInstance(args);
-		} catch (Exception e) {
-			throw new RuntimeException(
-					"Could not instantiate generated class '" + className + "'", e);
-		}
-	}
+    @SuppressWarnings("unchecked")
+    public T newInstance(ClassLoader classLoader, Object... args) {
+        try {
+            return (T) compile(classLoader).getConstructors()[0].newInstance(args);
+        } catch (Exception e) {
+            throw new RuntimeException(
+                    "Could not instantiate generated class '" + className + "'", e);
+        }
+    }
 
-	/**
-	 * Compiles the generated code, the compiled class will be cached in the {@link GeneratedClass}.
-	 */
-	public Class<T> compile(ClassLoader classLoader) {
-		if (compiledClass == null) {
-			// cache the compiled class
-			compiledClass = CompileUtils.compile(classLoader, className, code);
-		}
-		return compiledClass;
-	}
+    /**
+     * Compiles the generated code, the compiled class will be cached in the {@link GeneratedClass}.
+     */
+    public Class<T> compile(ClassLoader classLoader) {
+        if (compiledClass == null) {
+            // cache the compiled class
+            compiledClass = CompileUtils.compile(classLoader, className, code);
+        }
+        return compiledClass;
+    }
 
-	public String getClassName() {
-		return className;
-	}
+    public String getClassName() {
+        return className;
+    }
 
-	public String getCode() {
-		return code;
-	}
+    public String getCode() {
+        return code;
+    }
 
-	public Object[] getReferences() {
-		return references;
-	}
+    public Object[] getReferences() {
+        return references;
+    }
 
-	public Class<T> getClass(ClassLoader classLoader) {
-		return compile(classLoader);
-	}
+    public Class<T> getClass(ClassLoader classLoader) {
+        return compile(classLoader);
+    }
 }
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/GeneratedCollector.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/GeneratedCollector.java
index c360e77..f69e876 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/GeneratedCollector.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/GeneratedCollector.java
@@ -27,16 +27,16 @@ import org.apache.flink.util.Collector;
  */
 public class GeneratedCollector<C extends Collector<?>> extends GeneratedClass<C> {
 
-	private static final long serialVersionUID = -7355875544905245676L;
+    private static final long serialVersionUID = -7355875544905245676L;
 
-	/**
-	 * Creates a GeneratedCollector.
-	 *
-	 * @param className class name of the generated Collector.
-	 * @param code code of the generated Collector.
-	 * @param references referenced objects of the generated Collector.
-	 */
-	public GeneratedCollector(String className, String code, Object[] references) {
-		super(className, code, references);
-	}
+    /**
+     * Creates a GeneratedCollector.
+     *
+     * @param className class name of the generated Collector.
+     * @param code code of the generated Collector.
+     * @param references referenced objects of the generated Collector.
+     */
+    public GeneratedCollector(String className, String code, Object[] references) {
+        super(className, code, references);
+    }
 }
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/GeneratedFunction.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/GeneratedFunction.java
index 67adfe0..84b494c 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/GeneratedFunction.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/GeneratedFunction.java
@@ -27,16 +27,16 @@ import org.apache.flink.api.common.functions.Function;
  */
 public class GeneratedFunction<F extends Function> extends GeneratedClass<F> {
 
-	private static final long serialVersionUID = -7355875544905245676L;
+    private static final long serialVersionUID = -7355875544905245676L;
 
-	/**
-	 * Creates a GeneratedFunction.
-	 *
-	 * @param className class name of the generated Function.
-	 * @param code code of the generated Function.
-	 * @param references referenced objects of the generated Function.
-	 */
-	public GeneratedFunction(String className, String code, Object[] references) {
-		super(className, code, references);
-	}
+    /**
+     * Creates a GeneratedFunction.
+     *
+     * @param className class name of the generated Function.
+     * @param code code of the generated Function.
+     * @param references referenced objects of the generated Function.
+     */
+    public GeneratedFunction(String className, String code, Object[] references) {
+        super(className, code, references);
+    }
 }
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/GeneratedHashFunction.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/GeneratedHashFunction.java
index ac4c0b6..d8ed2d0 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/GeneratedHashFunction.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/GeneratedHashFunction.java
@@ -18,21 +18,19 @@
 
 package org.apache.flink.table.runtime.generated;
 
-/**
- * Describes a generated {@link HashFunction}.
- */
+/** Describes a generated {@link HashFunction}. */
 public class GeneratedHashFunction extends GeneratedClass<HashFunction> {
 
-	private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 1L;
 
-	/**
-	 * Creates a GeneratedHashFunction.
-	 *
-	 * @param className class name of the generated Function.
-	 * @param code code of the generated Function.
-	 * @param references referenced objects of the generated Function.
-	 */
-	public GeneratedHashFunction(String className, String code, Object[] references) {
-		super(className, code, references);
-	}
+    /**
+     * Creates a GeneratedHashFunction.
+     *
+     * @param className class name of the generated Function.
+     * @param code code of the generated Function.
+     * @param references referenced objects of the generated Function.
+     */
+    public GeneratedHashFunction(String className, String code, Object[] references) {
+        super(className, code, references);
+    }
 }
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/GeneratedInput.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/GeneratedInput.java
index 0cce73b..f1e52f4 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/GeneratedInput.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/GeneratedInput.java
@@ -27,16 +27,16 @@ import org.apache.flink.api.common.io.InputFormat;
  */
 public class GeneratedInput<F extends InputFormat<?, ?>> extends GeneratedClass<F> {
 
-	private static final long serialVersionUID = -7355875544905245676L;
+    private static final long serialVersionUID = -7355875544905245676L;
 
-	/**
-	 * Creates a GeneratedInput.
-	 *
-	 * @param className class name of the generated Function.
-	 * @param code code of the generated Function.
-	 * @param references referenced objects of the generated Function.
-	 */
-	public GeneratedInput(String className, String code, Object[] references) {
-		super(className, code, references);
-	}
+    /**
+     * Creates a GeneratedInput.
+     *
+     * @param className class name of the generated Function.
+     * @param code code of the generated Function.
+     * @param references referenced objects of the generated Function.
+     */
+    public GeneratedInput(String className, String code, Object[] references) {
+        super(className, code, references);
+    }
 }
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/GeneratedJoinCondition.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/GeneratedJoinCondition.java
index bf53de8..ed50bf9 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/GeneratedJoinCondition.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/GeneratedJoinCondition.java
@@ -18,21 +18,19 @@
 
 package org.apache.flink.table.runtime.generated;
 
-/**
- * Describes a generated {@link JoinCondition}.
- */
+/** Describes a generated {@link JoinCondition}. */
 public class GeneratedJoinCondition extends GeneratedClass<JoinCondition> {
 
-	private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 1L;
 
-	/**
-	 * Creates a GeneratedJoinCondition.
-	 *
-	 * @param className class name of the generated JoinCondition.
-	 * @param code code of the generated JoinCondition.
-	 * @param references referenced objects of the generated JoinCondition.
-	 */
-	public GeneratedJoinCondition(String className, String code, Object[] references) {
-		super(className, code, references);
-	}
+    /**
+     * Creates a GeneratedJoinCondition.
+     *
+     * @param className class name of the generated JoinCondition.
+     * @param code code of the generated JoinCondition.
+     * @param references referenced objects of the generated JoinCondition.
+     */
+    public GeneratedJoinCondition(String className, String code, Object[] references) {
+        super(className, code, references);
+    }
 }
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/GeneratedNamespaceAggsHandleFunction.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/GeneratedNamespaceAggsHandleFunction.java
index f33b044..68a010a 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/GeneratedNamespaceAggsHandleFunction.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/GeneratedNamespaceAggsHandleFunction.java
@@ -18,15 +18,14 @@
 
 package org.apache.flink.table.runtime.generated;
 
-/**
- * Describes a generated {@link NamespaceAggsHandleFunction}.
- */
+/** Describes a generated {@link NamespaceAggsHandleFunction}. */
 public class GeneratedNamespaceAggsHandleFunction<N>
-	extends GeneratedClass<NamespaceAggsHandleFunction<N>> {
+        extends GeneratedClass<NamespaceAggsHandleFunction<N>> {
 
-	private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 1L;
 
-	public GeneratedNamespaceAggsHandleFunction(String className, String code, Object[] references) {
-		super(className, code, references);
-	}
+    public GeneratedNamespaceAggsHandleFunction(
+            String className, String code, Object[] references) {
+        super(className, code, references);
+    }
 }
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/GeneratedNamespaceTableAggsHandleFunction.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/GeneratedNamespaceTableAggsHandleFunction.java
index 7237f36..6a108d2 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/GeneratedNamespaceTableAggsHandleFunction.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/GeneratedNamespaceTableAggsHandleFunction.java
@@ -18,15 +18,14 @@
 
 package org.apache.flink.table.runtime.generated;
 
-/**
- * Describes a generated {@link NamespaceTableAggsHandleFunction}.
- */
+/** Describes a generated {@link NamespaceTableAggsHandleFunction}. */
 public class GeneratedNamespaceTableAggsHandleFunction<N>
-	extends GeneratedClass<NamespaceTableAggsHandleFunction<N>> {
+        extends GeneratedClass<NamespaceTableAggsHandleFunction<N>> {
 
-	private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 1L;
 
-	public GeneratedNamespaceTableAggsHandleFunction(String className, String code, Object[] references) {
-		super(className, code, references);
-	}
+    public GeneratedNamespaceTableAggsHandleFunction(
+            String className, String code, Object[] references) {
+        super(className, code, references);
+    }
 }
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/GeneratedNormalizedKeyComputer.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/GeneratedNormalizedKeyComputer.java
index 0ef2750..79196a4 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/GeneratedNormalizedKeyComputer.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/GeneratedNormalizedKeyComputer.java
@@ -18,20 +18,18 @@
 
 package org.apache.flink.table.runtime.generated;
 
-/**
- * Describes a generated {@link NormalizedKeyComputer}.
- */
+/** Describes a generated {@link NormalizedKeyComputer}. */
 public class GeneratedNormalizedKeyComputer extends GeneratedClass<NormalizedKeyComputer> {
 
-	private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 1L;
 
-	/**
-	 * Creates a GeneratedNormalizedKeyComputer.
-	 *
-	 * @param className class name of the generated class.
-	 * @param code code of the generated class.
-	 */
-	public GeneratedNormalizedKeyComputer(String className, String code) {
-		super(className, code, new Object[0]);
-	}
+    /**
+     * Creates a GeneratedNormalizedKeyComputer.
+     *
+     * @param className class name of the generated class.
+     * @param code code of the generated class.
+     */
+    public GeneratedNormalizedKeyComputer(String className, String code) {
+        super(className, code, new Object[0]);
+    }
 }
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/GeneratedOperator.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/GeneratedOperator.java
index e24aeca..09b9015 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/GeneratedOperator.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/GeneratedOperator.java
@@ -27,16 +27,16 @@ import org.apache.flink.streaming.api.operators.StreamOperator;
  */
 public class GeneratedOperator<C extends StreamOperator<?>> extends GeneratedClass<C> {
 
-	private static final long serialVersionUID = -7355875544905245676L;
+    private static final long serialVersionUID = -7355875544905245676L;
 
-	/**
-	 * Creates a GeneratedOperator.
-	 *
-	 * @param className class name of the generated StreamOperator.
-	 * @param code code of the generated StreamOperator.
-	 * @param references referenced objects of the generated StreamOperator.
-	 */
-	public GeneratedOperator(String className, String code, Object[] references) {
-		super(className, code, references);
-	}
+    /**
+     * Creates a GeneratedOperator.
+     *
+     * @param className class name of the generated StreamOperator.
+     * @param code code of the generated StreamOperator.
+     * @param references referenced objects of the generated StreamOperator.
+     */
+    public GeneratedOperator(String className, String code, Object[] references) {
+        super(className, code, references);
+    }
 }
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/GeneratedProjection.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/GeneratedProjection.java
index 306a783..ff702dd 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/GeneratedProjection.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/GeneratedProjection.java
@@ -18,21 +18,19 @@
 
 package org.apache.flink.table.runtime.generated;
 
-/**
- * Describes a generated {@link Projection}.
- */
+/** Describes a generated {@link Projection}. */
 public class GeneratedProjection extends GeneratedClass<Projection> {
 
-	private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 1L;
 
-	/**
-	 * Creates a GeneratedProjection.
-	 *
-	 * @param className class name of the generated Function.
-	 * @param code code of the generated Function.
-	 * @param references referenced objects of the generated Function.
-	 */
-	public GeneratedProjection(String className, String code, Object[] references) {
-		super(className, code, references);
-	}
+    /**
+     * Creates a GeneratedProjection.
+     *
+     * @param className class name of the generated Function.
+     * @param code code of the generated Function.
+     * @param references referenced objects of the generated Function.
+     */
+    public GeneratedProjection(String className, String code, Object[] references) {
+        super(className, code, references);
+    }
 }
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/GeneratedRecordComparator.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/GeneratedRecordComparator.java
index d266e8b..9bbba60 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/GeneratedRecordComparator.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/GeneratedRecordComparator.java
@@ -18,22 +18,19 @@
 
 package org.apache.flink.table.runtime.generated;
 
-/**
- * Describes a generated {@link RecordComparator}.
- */
+/** Describes a generated {@link RecordComparator}. */
 public class GeneratedRecordComparator extends GeneratedClass<RecordComparator> {
 
-	private static final long serialVersionUID = 1L;
-
-	/**
-	 * Creates a GeneratedRecordComparator.
-	 *
-	 * @param className class name of the generated class.
-	 * @param code code of the generated class.
-	 * @param references referenced objects of the generated class.
-	 */
-	public GeneratedRecordComparator(String className, String code, Object[] references) {
-		super(className, code, references);
-	}
+    private static final long serialVersionUID = 1L;
 
+    /**
+     * Creates a GeneratedRecordComparator.
+     *
+     * @param className class name of the generated class.
+     * @param code code of the generated class.
+     * @param references referenced objects of the generated class.
+     */
+    public GeneratedRecordComparator(String className, String code, Object[] references) {
+        super(className, code, references);
+    }
 }
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/GeneratedRecordEqualiser.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/GeneratedRecordEqualiser.java
index d8fc2bb..f266e57 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/GeneratedRecordEqualiser.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/GeneratedRecordEqualiser.java
@@ -18,21 +18,19 @@
 
 package org.apache.flink.table.runtime.generated;
 
-/**
- * Describes a generated {@link RecordEqualiser}.
- */
+/** Describes a generated {@link RecordEqualiser}. */
 public class GeneratedRecordEqualiser extends GeneratedClass<RecordEqualiser> {
 
-	private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 1L;
 
-	/**
-	 * Creates a GeneratedRecordEqualiser.
-	 *
-	 * @param className  class name of the generated class.
-	 * @param code       code of the generated class.
-	 * @param references referenced objects of the generated class.
-	 */
-	public GeneratedRecordEqualiser(String className, String code, Object[] references) {
-		super(className, code, references);
-	}
+    /**
+     * Creates a GeneratedRecordEqualiser.
+     *
+     * @param className class name of the generated class.
+     * @param code code of the generated class.
+     * @param references referenced objects of the generated class.
+     */
+    public GeneratedRecordEqualiser(String className, String code, Object[] references) {
+        super(className, code, references);
+    }
 }
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/GeneratedResultFuture.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/GeneratedResultFuture.java
index c8ba5f98..fd5ca2c 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/GeneratedResultFuture.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/GeneratedResultFuture.java
@@ -27,16 +27,16 @@ import org.apache.flink.streaming.api.functions.async.ResultFuture;
  */
 public class GeneratedResultFuture<T extends ResultFuture<?>> extends GeneratedClass<T> {
 
-	private static final long serialVersionUID = -7355875544905245676L;
+    private static final long serialVersionUID = -7355875544905245676L;
 
-	/**
-	 * Creates a GeneratedResultFuture.
-	 *
-	 * @param className class name of the generated ResultFuture.
-	 * @param code code of the generated ResultFuture.
-	 * @param references referenced objects of the generated ResultFuture.
-	 */
-	public GeneratedResultFuture(String className, String code, Object[] references) {
-		super(className, code, references);
-	}
+    /**
+     * Creates a GeneratedResultFuture.
+     *
+     * @param className class name of the generated ResultFuture.
+     * @param code code of the generated ResultFuture.
+     * @param references referenced objects of the generated ResultFuture.
+     */
+    public GeneratedResultFuture(String className, String code, Object[] references) {
+        super(className, code, references);
+    }
 }
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/GeneratedTableAggsHandleFunction.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/GeneratedTableAggsHandleFunction.java
index f45dd7c..395135b 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/GeneratedTableAggsHandleFunction.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/GeneratedTableAggsHandleFunction.java
@@ -18,14 +18,12 @@
 
 package org.apache.flink.table.runtime.generated;
 
-/**
- * Describes a generated {@link TableAggsHandleFunction}.
- */
+/** Describes a generated {@link TableAggsHandleFunction}. */
 public class GeneratedTableAggsHandleFunction extends GeneratedClass<TableAggsHandleFunction> {
 
-	private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 1L;
 
-	public GeneratedTableAggsHandleFunction(String className, String code, Object[] references) {
-		super(className, code, references);
-	}
+    public GeneratedTableAggsHandleFunction(String className, String code, Object[] references) {
+        super(className, code, references);
+    }
 }
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/GeneratedWatermarkGenerator.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/GeneratedWatermarkGenerator.java
index 18bd45a..47a7cde 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/GeneratedWatermarkGenerator.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/GeneratedWatermarkGenerator.java
@@ -18,14 +18,12 @@
 
 package org.apache.flink.table.runtime.generated;
 
-/**
- * Describes a generated {@link WatermarkGenerator}.
- */
+/** Describes a generated {@link WatermarkGenerator}. */
 public class GeneratedWatermarkGenerator extends GeneratedClass<WatermarkGenerator> {
 
-	private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 1L;
 
-	public GeneratedWatermarkGenerator(String className, String code, Object[] references) {
-		super(className, code, references);
-	}
+    public GeneratedWatermarkGenerator(String className, String code, Object[] references) {
+        super(className, code, references);
+    }
 }
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/HashFunction.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/HashFunction.java
index 51295b9..990e019 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/HashFunction.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/HashFunction.java
@@ -21,11 +21,9 @@ package org.apache.flink.table.runtime.generated;
 import org.apache.flink.table.data.RowData;
 
 /**
- * Interface for code generated hash code of {@link RowData}, which will select some
- * fields to hash.
+ * Interface for code generated hash code of {@link RowData}, which will select some fields to hash.
  */
 public interface HashFunction {
 
-	int hashCode(RowData row);
-
+    int hashCode(RowData row);
 }
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/JoinCondition.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/JoinCondition.java
index 73b1342..473ad6e 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/JoinCondition.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/JoinCondition.java
@@ -21,14 +21,9 @@ package org.apache.flink.table.runtime.generated;
 import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.table.data.RowData;
 
-/**
- * Interface for code generated condition function for [[org.apache.calcite.rel.core.Join]].
- */
+/** Interface for code generated condition function for [[org.apache.calcite.rel.core.Join]]. */
 public interface JoinCondition extends RichFunction {
 
-	/**
-	 * @return true if the join condition stays true for the joined row (in1, in2)
-	 */
-	boolean apply(RowData in1, RowData in2);
-
+    /** @return true if the join condition stays true for the joined row (in1, in2) */
+    boolean apply(RowData in1, RowData in2);
 }
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/NamespaceAggsHandleFunction.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/NamespaceAggsHandleFunction.java
index 79f0164..a5ef0ff 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/NamespaceAggsHandleFunction.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/NamespaceAggsHandleFunction.java
@@ -20,17 +20,15 @@ package org.apache.flink.table.runtime.generated;
 
 import org.apache.flink.table.data.RowData;
 
-/**
- * The base class for handling aggregate functions with namespace.
- */
+/** The base class for handling aggregate functions with namespace. */
 public interface NamespaceAggsHandleFunction<N> extends NamespaceAggsHandleFunctionBase<N> {
 
-	/**
-	 * Gets the result of the aggregation from the current accumulators and
-	 * namespace properties (like window start).
-	 *
-	 * @param namespace the namespace properties which should be calculated, such window start
-	 * @return the final result (saved in a row) of the current accumulators.
-	 */
-	RowData getValue(N namespace) throws Exception;
+    /**
+     * Gets the result of the aggregation from the current accumulators and namespace properties
+     * (like window start).
+     *
+     * @param namespace the namespace properties which should be calculated, such window start
+     * @return the final result (saved in a row) of the current accumulators.
+     */
+    RowData getValue(N namespace) throws Exception;
 }
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/NamespaceAggsHandleFunctionBase.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/NamespaceAggsHandleFunctionBase.java
index 6e2b43d..53a0af2 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/NamespaceAggsHandleFunctionBase.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/NamespaceAggsHandleFunctionBase.java
@@ -25,67 +25,64 @@ import org.apache.flink.table.runtime.dataview.StateDataViewStore;
 /**
  * The base class for handling aggregate or table aggregate functions.
  *
- * <p>The differences between {@link NamespaceAggsHandleFunctionBase} and
- * {@link AggsHandleFunctionBase} is that the {@link NamespaceAggsHandleFunctionBase} has namespace.
+ * <p>The differences between {@link NamespaceAggsHandleFunctionBase} and {@link
+ * AggsHandleFunctionBase} is that the {@link NamespaceAggsHandleFunctionBase} has namespace.
  *
  * @param <N> type of namespace
  */
 public interface NamespaceAggsHandleFunctionBase<N> extends Function {
 
-	/**
-	 * Initialization method for the function. It is called before the actual working methods.
-	 */
-	void open(StateDataViewStore store) throws Exception;
+    /** Initialization method for the function. It is called before the actual working methods. */
+    void open(StateDataViewStore store) throws Exception;
 
-	/**
-	 * Set the current accumulators (saved in a row) which contains the current
-	 * aggregated results.
-	 * @param accumulators current accumulators
-	 */
-	void setAccumulators(N namespace, RowData accumulators) throws Exception;
+    /**
+     * Set the current accumulators (saved in a row) which contains the current aggregated results.
+     *
+     * @param accumulators current accumulators
+     */
+    void setAccumulators(N namespace, RowData accumulators) throws Exception;
 
-	/**
-	 * Accumulates the input values to the accumulators.
-	 * @param inputRow input values bundled in a row
-	 */
-	void accumulate(RowData inputRow) throws Exception;
+    /**
+     * Accumulates the input values to the accumulators.
+     *
+     * @param inputRow input values bundled in a row
+     */
+    void accumulate(RowData inputRow) throws Exception;
 
-	/**
-	 * Retracts the input values from the accumulators.
-	 * @param inputRow input values bundled in a row
-	 */
-	void retract(RowData inputRow) throws Exception;
+    /**
+     * Retracts the input values from the accumulators.
+     *
+     * @param inputRow input values bundled in a row
+     */
+    void retract(RowData inputRow) throws Exception;
 
-	/**
-	 * Merges the other accumulators into current accumulators.
-	 *
-	 * @param otherAcc The other row of accumulators
-	 */
-	void merge(N namespace, RowData otherAcc) throws Exception;
+    /**
+     * Merges the other accumulators into current accumulators.
+     *
+     * @param otherAcc The other row of accumulators
+     */
+    void merge(N namespace, RowData otherAcc) throws Exception;
 
-	/**
-	 * Initializes the accumulators and save them to a accumulators row.
-	 *
-	 * @return a row of accumulators which contains the aggregated results
-	 */
-	RowData createAccumulators() throws Exception;
+    /**
+     * Initializes the accumulators and save them to a accumulators row.
+     *
+     * @return a row of accumulators which contains the aggregated results
+     */
+    RowData createAccumulators() throws Exception;
 
-	/**
-	 * Gets the current accumulators (saved in a row) which contains the current
-	 * aggregated results.
-	 * @return the current accumulators
-	 */
-	RowData getAccumulators() throws Exception;
+    /**
+     * Gets the current accumulators (saved in a row) which contains the current aggregated results.
+     *
+     * @return the current accumulators
+     */
+    RowData getAccumulators() throws Exception;
 
-	/**
-	 * Cleanup for the retired accumulators state.
-	 */
-	void cleanup(N namespace) throws Exception;
-
-	/**
-	 * Tear-down method for this function. It can be used for clean up work.
-	 * By default, this method does nothing.
-	 */
-	void close() throws Exception;
+    /** Cleanup for the retired accumulators state. */
+    void cleanup(N namespace) throws Exception;
 
+    /**
+     * Tear-down method for this function. It can be used for clean up work. By default, this method
+     * does nothing.
+     */
+    void close() throws Exception;
 }
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/NamespaceTableAggsHandleFunction.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/NamespaceTableAggsHandleFunction.java
index 7610040..93c5c68 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/NamespaceTableAggsHandleFunction.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/NamespaceTableAggsHandleFunction.java
@@ -21,18 +21,16 @@ package org.apache.flink.table.runtime.generated;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.util.Collector;
 
-/**
- * The base class for handling table aggregate functions with namespace.
- */
+/** The base class for handling table aggregate functions with namespace. */
 public interface NamespaceTableAggsHandleFunction<N> extends NamespaceAggsHandleFunctionBase<N> {
 
-	/**
-	 * Emits the result of the aggregation from the current accumulators and namespace
-	 * properties (like window start).
-	 *
-	 * @param namespace the namespace properties which should be calculated, such window start
-	 * @param key       the group key for the current emit.
-	 * @param out       the collector used to emit results.
-	 */
-	void emitValue(N namespace, RowData key, Collector<RowData> out) throws Exception;
+    /**
+     * Emits the result of the aggregation from the current accumulators and namespace properties
+     * (like window start).
+     *
+     * @param namespace the namespace properties which should be calculated, such window start
+     * @param key the group key for the current emit.
+     * @param out the collector used to emit results.
+     */
+    void emitValue(N namespace, RowData key, Collector<RowData> out) throws Exception;
 }
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/NormalizedKeyComputer.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/NormalizedKeyComputer.java
index c5e7f13..ce46290 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/NormalizedKeyComputer.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/NormalizedKeyComputer.java
@@ -23,38 +23,26 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.runtime.operators.sort.BinaryInMemorySortBuffer;
 
 /**
- * Normalized key computer for {@link BinaryInMemorySortBuffer}.
- * For performance, subclasses are usually implemented through CodeGenerator.
+ * Normalized key computer for {@link BinaryInMemorySortBuffer}. For performance, subclasses are
+ * usually implemented through CodeGenerator.
  */
 public interface NormalizedKeyComputer {
 
-	/**
-	 * Writes a normalized key for the given record into the target {@link MemorySegment}.
-	 */
-	void putKey(RowData record, MemorySegment target, int offset);
-
-	/**
-	 * Compares two normalized keys in respective {@link MemorySegment}.
-	 */
-	int compareKey(MemorySegment segI, int offsetI, MemorySegment segJ, int offsetJ);
-
-	/**
-	 * Swaps two normalized keys in respective {@link MemorySegment}.
-	 */
-	void swapKey(MemorySegment segI, int offsetI, MemorySegment segJ, int offsetJ);
-
-	/**
-	 * Get normalized keys bytes length.
-	 */
-	int getNumKeyBytes();
-
-	/**
-	 * whether the normalized key can fully determines the comparison.
-	 */
-	boolean isKeyFullyDetermines();
-
-	/**
-	 * Flag whether normalized key comparisons should be inverted key.
-	 */
-	boolean invertKey();
+    /** Writes a normalized key for the given record into the target {@link MemorySegment}. */
+    void putKey(RowData record, MemorySegment target, int offset);
+
+    /** Compares two normalized keys in respective {@link MemorySegment}. */
+    int compareKey(MemorySegment segI, int offsetI, MemorySegment segJ, int offsetJ);
+
+    /** Swaps two normalized keys in respective {@link MemorySegment}. */
+    void swapKey(MemorySegment segI, int offsetI, MemorySegment segJ, int offsetJ);
+
+    /** Get normalized keys bytes length. */
+    int getNumKeyBytes();
+
+    /** whether the normalized key can fully determines the comparison. */
+    boolean isKeyFullyDetermines();
+
+    /** Flag whether normalized key comparisons should be inverted key. */
+    boolean invertKey();
 }
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/Projection.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/Projection.java
index 62139bd..3cf2b5a 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/Projection.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/Projection.java
@@ -20,11 +20,8 @@ package org.apache.flink.table.runtime.generated;
 
 import org.apache.flink.table.data.RowData;
 
-/**
- * Interface for code generated projection, which will map a RowData to another one.
- */
+/** Interface for code generated projection, which will map a RowData to another one. */
 public interface Projection<IN extends RowData, OUT extends RowData> {
 
-	OUT apply(IN row);
-
+    OUT apply(IN row);
 }
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/RecordComparator.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/RecordComparator.java
index 31f68a1..9d2500c 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/RecordComparator.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/RecordComparator.java
@@ -25,12 +25,11 @@ import java.io.Serializable;
 import java.util.Comparator;
 
 /**
- * Record comparator for {@link BinaryInMemorySortBuffer}.
- * For performance, subclasses are usually implemented through CodeGenerator.
- * A new interface for helping JVM inline.
+ * Record comparator for {@link BinaryInMemorySortBuffer}. For performance, subclasses are usually
+ * implemented through CodeGenerator. A new interface for helping JVM inline.
  */
 public interface RecordComparator extends Comparator<RowData>, Serializable {
 
-	@Override
-	int compare(RowData o1, RowData o2);
+    @Override
+    int compare(RowData o1, RowData o2);
 }
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/RecordEqualiser.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/RecordEqualiser.java
index 03ccb9b..4967414 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/RecordEqualiser.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/RecordEqualiser.java
@@ -27,9 +27,6 @@ import java.io.Serializable;
  */
 public interface RecordEqualiser extends Serializable {
 
-	/**
-	 * Returns {@code true} if the rows are equal to each other
-	 * and {@code false} otherwise.
-	 */
-	boolean equals(RowData row1, RowData row2);
+    /** Returns {@code true} if the rows are equal to each other and {@code false} otherwise. */
+    boolean equals(RowData row1, RowData row2);
 }
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/TableAggsHandleFunction.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/TableAggsHandleFunction.java
index 1d721c7..7f279f1 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/TableAggsHandleFunction.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/TableAggsHandleFunction.java
@@ -31,12 +31,12 @@ import org.apache.flink.util.Collector;
  */
 public interface TableAggsHandleFunction extends AggsHandleFunctionBase {
 
-	/**
-	 * Emit the result of the table aggregation through the collector.
-	 *
-	 * @param out        the collector used to emit records.
-	 * @param currentKey the current group key.
-	 * @param isRetract  the retraction flag which indicates whether emit retract values.
-	 */
-	void emitValue(Collector<RowData> out, RowData currentKey, boolean isRetract) throws Exception;
+    /**
+     * Emit the result of the table aggregation through the collector.
+     *
+     * @param out the collector used to emit records.
+     * @param currentKey the current group key.
+     * @param isRetract the retraction flag which indicates whether emit retract values.
+     */
+    void emitValue(Collector<RowData> out, RowData currentKey, boolean isRetract) throws Exception;
 }
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/WatermarkGenerator.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/WatermarkGenerator.java
index cd5294b..7a9898c 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/WatermarkGenerator.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/WatermarkGenerator.java
@@ -23,19 +23,17 @@ import org.apache.flink.table.data.RowData;
 
 import javax.annotation.Nullable;
 
-/**
- * The {@link WatermarkGenerator} is used to generate watermark based the input elements.
- */
+/** The {@link WatermarkGenerator} is used to generate watermark based the input elements. */
 public abstract class WatermarkGenerator extends AbstractRichFunction {
 
-	private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 1L;
 
-	/**
-	 * Returns the watermark for the current row or null if no watermark should be generated.
-	 *
-	 * @param row The current row.
-	 * @return The watermark for this row or null if no watermark should be generated.
-	 */
-	@Nullable
-	public abstract Long currentWatermark(RowData row) throws Exception;
+    /**
+     * Returns the watermark for the current row or null if no watermark should be generated.
+     *
+     * @param row The current row.
+     * @return The watermark for this row or null if no watermark should be generated.
+     */
+    @Nullable
+    public abstract Long currentWatermark(RowData row) throws Exception;
 }
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/generated/CompileUtilsTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/generated/CompileUtilsTest.java
index fb9f985..7efb26a 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/generated/CompileUtilsTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/generated/CompileUtilsTest.java
@@ -31,51 +31,40 @@ import java.net.URLClassLoader;
 import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertSame;
 
-/**
- * Tests for {@link CompileUtils}.
- */
+/** Tests for {@link CompileUtils}. */
 public class CompileUtilsTest {
 
-	@Rule
-	public ExpectedException thrown = ExpectedException.none();
+    @Rule public ExpectedException thrown = ExpectedException.none();
 
-	@Before
-	public void before() {
-		// cleanup cached class before tests
-		CompileUtils.COMPILED_CACHE.invalidateAll();
-	}
+    @Before
+    public void before() {
+        // cleanup cached class before tests
+        CompileUtils.COMPILED_CACHE.invalidateAll();
+    }
 
-	@Test
-	public void testCacheReuse() {
-		String code =
-			"public class Main {\n" +
-			"  int i;\n" +
-			"  int j;\n" +
-			"}";
+    @Test
+    public void testCacheReuse() {
+        String code = "public class Main {\n" + "  int i;\n" + "  int j;\n" + "}";
 
-		Class<?> class1 = CompileUtils.compile(this.getClass().getClassLoader(), "Main", code);
-		Class<?> class2 = CompileUtils.compile(this.getClass().getClassLoader(), "Main", code);
-		Class<?> class3 = CompileUtils.compile(new TestClassLoader(), "Main", code);
-		assertSame(class1, class2);
-		assertNotSame(class1, class3);
-	}
+        Class<?> class1 = CompileUtils.compile(this.getClass().getClassLoader(), "Main", code);
+        Class<?> class2 = CompileUtils.compile(this.getClass().getClassLoader(), "Main", code);
+        Class<?> class3 = CompileUtils.compile(new TestClassLoader(), "Main", code);
+        assertSame(class1, class2);
+        assertNotSame(class1, class3);
+    }
 
-	@Test
-	public void testWrongCode() {
-		String code =
-			"public class111 Main {\n" +
-			"  int i;\n" +
-			"  int j;\n" +
-			"}";
+    @Test
+    public void testWrongCode() {
+        String code = "public class111 Main {\n" + "  int i;\n" + "  int j;\n" + "}";
 
-		thrown.expect(FlinkRuntimeException.class);
-		CompileUtils.compile(this.getClass().getClassLoader(), "Main", code);
-	}
+        thrown.expect(FlinkRuntimeException.class);
+        CompileUtils.compile(this.getClass().getClassLoader(), "Main", code);
+    }
 
-	private static class TestClassLoader extends URLClassLoader {
+    private static class TestClassLoader extends URLClassLoader {
 
-		TestClassLoader() {
-			super(new URL[0], Thread.currentThread().getContextClassLoader());
-		}
-	}
+        TestClassLoader() {
+            super(new URL[0], Thread.currentThread().getContextClassLoader());
+        }
+    }
 }
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/generated/GeneratedCollectorWrapper.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/generated/GeneratedCollectorWrapper.java
index 74931ca..f26a998 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/generated/GeneratedCollectorWrapper.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/generated/GeneratedCollectorWrapper.java
@@ -21,31 +21,32 @@ package org.apache.flink.table.runtime.generated;
 import org.apache.flink.util.Collector;
 
 /**
- * A wrapper for {@link GeneratedCollector} which wraps a class instead of generated code in it.
- * It is only used for easy testing.
+ * A wrapper for {@link GeneratedCollector} which wraps a class instead of generated code in it. It
+ * is only used for easy testing.
  */
 public class GeneratedCollectorWrapper<C extends Collector<?>> extends GeneratedCollector<C> {
 
-	private static final long serialVersionUID = 3964204655565783705L;
-	private final Class<C> clazz;
+    private static final long serialVersionUID = 3964204655565783705L;
+    private final Class<C> clazz;
 
-	public GeneratedCollectorWrapper(C collector) {
-		super(collector.getClass().getSimpleName(), "N/A", new Object[0]);
-		//noinspection unchecked
-		this.clazz = (Class<C>) collector.getClass();
-	}
+    public GeneratedCollectorWrapper(C collector) {
+        super(collector.getClass().getSimpleName(), "N/A", new Object[0]);
+        //noinspection unchecked
+        this.clazz = (Class<C>) collector.getClass();
+    }
 
-	@Override
-	public C newInstance(ClassLoader classLoader) {
-		try {
-			return clazz.newInstance();
-		} catch (Exception e) {
-			throw new RuntimeException("Could not instantiate class " + clazz.getCanonicalName(), e);
-		}
-	}
+    @Override
+    public C newInstance(ClassLoader classLoader) {
+        try {
+            return clazz.newInstance();
+        } catch (Exception e) {
+            throw new RuntimeException(
+                    "Could not instantiate class " + clazz.getCanonicalName(), e);
+        }
+    }
 
-	@Override
-	public Class<C> compile(ClassLoader classLoader) {
-		return clazz;
-	}
+    @Override
+    public Class<C> compile(ClassLoader classLoader) {
+        return clazz;
+    }
 }
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/generated/GeneratedFunctionWrapper.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/generated/GeneratedFunctionWrapper.java
index bad23c0..589cb5e 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/generated/GeneratedFunctionWrapper.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/generated/GeneratedFunctionWrapper.java
@@ -21,31 +21,32 @@ package org.apache.flink.table.runtime.generated;
 import org.apache.flink.api.common.functions.Function;
 
 /**
- * A wrapper for {@link GeneratedFunction} which wraps a class instead of generated code in it.
- * It is only used for easy testing.
+ * A wrapper for {@link GeneratedFunction} which wraps a class instead of generated code in it. It
+ * is only used for easy testing.
  */
 public class GeneratedFunctionWrapper<F extends Function> extends GeneratedFunction<F> {
 
-	private static final long serialVersionUID = 3964204655565783705L;
-	private final Class<F> clazz;
+    private static final long serialVersionUID = 3964204655565783705L;
+    private final Class<F> clazz;
 
-	public GeneratedFunctionWrapper(F function) {
-		super(function.getClass().getSimpleName(), "N/A", new Object[0]);
-		//noinspection unchecked
-		this.clazz = (Class<F>) function.getClass();
-	}
+    public GeneratedFunctionWrapper(F function) {
+        super(function.getClass().getSimpleName(), "N/A", new Object[0]);
+        //noinspection unchecked
+        this.clazz = (Class<F>) function.getClass();
+    }
 
-	@Override
-	public F newInstance(ClassLoader classLoader) {
-		try {
-			return clazz.newInstance();
-		} catch (Exception e) {
-			throw new RuntimeException("Could not instantiate class " + clazz.getCanonicalName(), e);
-		}
-	}
+    @Override
+    public F newInstance(ClassLoader classLoader) {
+        try {
+            return clazz.newInstance();
+        } catch (Exception e) {
+            throw new RuntimeException(
+                    "Could not instantiate class " + clazz.getCanonicalName(), e);
+        }
+    }
 
-	@Override
-	public Class<F> compile(ClassLoader classLoader) {
-		return clazz;
-	}
+    @Override
+    public Class<F> compile(ClassLoader classLoader) {
+        return clazz;
+    }
 }
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/generated/GeneratedResultFutureWrapper.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/generated/GeneratedResultFutureWrapper.java
index 61f9fbb..bafc7e0 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/generated/GeneratedResultFutureWrapper.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/generated/GeneratedResultFutureWrapper.java
@@ -24,28 +24,30 @@ import org.apache.flink.streaming.api.functions.async.ResultFuture;
  * A wrapper for {@link GeneratedResultFuture} which wraps a class instead of generated code in it.
  * It is only used for easy testing.
  */
-public class GeneratedResultFutureWrapper<T extends ResultFuture<?>> extends GeneratedResultFuture<T> {
+public class GeneratedResultFutureWrapper<T extends ResultFuture<?>>
+        extends GeneratedResultFuture<T> {
 
-	private static final long serialVersionUID = 3964204655565783705L;
-	private final Class<T> clazz;
+    private static final long serialVersionUID = 3964204655565783705L;
+    private final Class<T> clazz;
 
-	public GeneratedResultFutureWrapper(T resultFuture) {
-		super(resultFuture.getClass().getSimpleName(), "N/A", new Object[0]);
-		//noinspection unchecked
-		this.clazz = (Class<T>) resultFuture.getClass();
-	}
+    public GeneratedResultFutureWrapper(T resultFuture) {
+        super(resultFuture.getClass().getSimpleName(), "N/A", new Object[0]);
+        //noinspection unchecked
+        this.clazz = (Class<T>) resultFuture.getClass();
+    }
 
-	@Override
-	public T newInstance(ClassLoader classLoader) {
-		try {
-			return clazz.newInstance();
-		} catch (Exception e) {
-			throw new RuntimeException("Could not instantiate class " + clazz.getCanonicalName(), e);
-		}
-	}
+    @Override
+    public T newInstance(ClassLoader classLoader) {
+        try {
+            return clazz.newInstance();
+        } catch (Exception e) {
+            throw new RuntimeException(
+                    "Could not instantiate class " + clazz.getCanonicalName(), e);
+        }
+    }
 
-	@Override
-	public Class<T> compile(ClassLoader classLoader) {
-		return clazz;
-	}
+    @Override
+    public Class<T> compile(ClassLoader classLoader) {
+        return clazz;
+    }
 }