You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2023/07/10 16:03:26 UTC

[flink-connector-hbase] 08/26: [FLINK-20651] Format code with Spotless/google-java-format

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-hbase.git

commit 9be77d73f9bed1d605a00e21c72c2661ade43c42
Author: Rufus Refactor <de...@flink.apache.org>
AuthorDate: Mon Dec 28 14:30:59 2020 +0100

    [FLINK-20651] Format code with Spotless/google-java-format
---
 .../flink/tests/util/hbase/HBaseResource.java      |  88 ++---
 .../tests/util/hbase/HBaseResourceFactory.java     |  23 +-
 .../util/hbase/LocalStandaloneHBaseResource.java   | 387 +++++++++++----------
 .../hbase/LocalStandaloneHBaseResourceFactory.java |  12 +-
 .../tests/util/hbase/SQLClientHBaseITCase.java     | 320 +++++++++--------
 5 files changed, 433 insertions(+), 397 deletions(-)

diff --git a/flink-connector-hbase-e2e-tests/src/main/java/org/apache/flink/tests/util/hbase/HBaseResource.java b/flink-connector-hbase-e2e-tests/src/main/java/org/apache/flink/tests/util/hbase/HBaseResource.java
index 83bf249..a085b1d 100644
--- a/flink-connector-hbase-e2e-tests/src/main/java/org/apache/flink/tests/util/hbase/HBaseResource.java
+++ b/flink-connector-hbase-e2e-tests/src/main/java/org/apache/flink/tests/util/hbase/HBaseResource.java
@@ -24,50 +24,56 @@ import org.apache.flink.util.ExternalResource;
 import java.io.IOException;
 import java.util.List;
 
-/**
- * Generic interface for interacting with HBase.
- */
+/** Generic interface for interacting with HBase. */
 public interface HBaseResource extends ExternalResource {
 
-	/**
-	 * Creates a table with the given name and column families.
-	 *
-	 * @param tableName      desired table name
-	 * @param columnFamilies column family to create
-	 * @throws IOException
-	 */
-	void createTable(String tableName, String... columnFamilies) throws IOException;
+    /**
+     * Creates a table with the given name and column families.
+     *
+     * @param tableName desired table name
+     * @param columnFamilies column family to create
+     * @throws IOException
+     */
+    void createTable(String tableName, String... columnFamilies) throws IOException;
 
-	/**
-	 * Scan the given HBase table.
-	 *
-	 * @param tableName table desired to scan
-	 * @throws IOException
-	 */
-	List<String> scanTable(String tableName) throws IOException;
+    /**
+     * Scan the given HBase table.
+     *
+     * @param tableName table desired to scan
+     * @throws IOException
+     */
+    List<String> scanTable(String tableName) throws IOException;
 
-	/**
-	 * Put the given data to the given table.
-	 *
-	 * @param tableName       table to put data
-	 * @param rowKey          row key of the given data
-	 * @param columnFamily    column family of the given data
-	 * @param columnQualifier  column qualifier of the given data
-	 * @param value           value of the given data
-	 * @throws IOException
-	 */
-	void putData(String tableName, String rowKey, String columnFamily, String columnQualifier, String value) throws IOException;
+    /**
+     * Put the given data to the given table.
+     *
+     * @param tableName table to put data
+     * @param rowKey row key of the given data
+     * @param columnFamily column family of the given data
+     * @param columnQualifier column qualifier of the given data
+     * @param value value of the given data
+     * @throws IOException
+     */
+    void putData(
+            String tableName,
+            String rowKey,
+            String columnFamily,
+            String columnQualifier,
+            String value)
+            throws IOException;
 
-	/**
-	 * Returns the configured HBaseResource implementation, or a {@link LocalStandaloneHBaseResource} if none is configured.
-	 *
-	 * @param version The hbase version
-	 * @return configured HbaseResource, or {@link LocalStandaloneHBaseResource} if none is configured
-	 */
-	static HBaseResource get(String version) {
-		return FactoryUtils.loadAndInvokeFactory(
-			HBaseResourceFactory.class,
-			factory -> factory.create(version),
-			LocalStandaloneHBaseResourceFactory::new);
-	}
+    /**
+     * Returns the configured HBaseResource implementation, or a {@link
+     * LocalStandaloneHBaseResource} if none is configured.
+     *
+     * @param version The hbase version
+     * @return configured HbaseResource, or {@link LocalStandaloneHBaseResource} if none is
+     *     configured
+     */
+    static HBaseResource get(String version) {
+        return FactoryUtils.loadAndInvokeFactory(
+                HBaseResourceFactory.class,
+                factory -> factory.create(version),
+                LocalStandaloneHBaseResourceFactory::new);
+    }
 }
diff --git a/flink-connector-hbase-e2e-tests/src/main/java/org/apache/flink/tests/util/hbase/HBaseResourceFactory.java b/flink-connector-hbase-e2e-tests/src/main/java/org/apache/flink/tests/util/hbase/HBaseResourceFactory.java
index 27b3864..aec789f 100644
--- a/flink-connector-hbase-e2e-tests/src/main/java/org/apache/flink/tests/util/hbase/HBaseResourceFactory.java
+++ b/flink-connector-hbase-e2e-tests/src/main/java/org/apache/flink/tests/util/hbase/HBaseResourceFactory.java
@@ -20,19 +20,18 @@ package org.apache.flink.tests.util.hbase;
 
 import java.util.Optional;
 
-/**
- * A factory for {@link HBaseResource} implementations.
- */
+/** A factory for {@link HBaseResource} implementations. */
 @FunctionalInterface
 public interface HBaseResourceFactory {
 
-	/**
-	 * Returns a {@link HBaseResource} instance. If the instance could not be instantiated (for example, because a
-	 * mandatory parameter was missing), then an empty {@link Optional} should be returned.
-	 *
-	 * @param version The hbase version
-	 * @return HBaseResource instance
-	 * @throws Exception if the instance could not be instantiated
-	 */
-	HBaseResource create(String version) throws Exception;
+    /**
+     * Returns a {@link HBaseResource} instance. If the instance could not be instantiated (for
+     * example, because a mandatory parameter was missing), then an empty {@link Optional} should be
+     * returned.
+     *
+     * @param version The hbase version
+     * @return HBaseResource instance
+     * @throws Exception if the instance could not be instantiated
+     */
+    HBaseResource create(String version) throws Exception;
 }
diff --git a/flink-connector-hbase-e2e-tests/src/main/java/org/apache/flink/tests/util/hbase/LocalStandaloneHBaseResource.java b/flink-connector-hbase-e2e-tests/src/main/java/org/apache/flink/tests/util/hbase/LocalStandaloneHBaseResource.java
index d8c03d7..43df763 100644
--- a/flink-connector-hbase-e2e-tests/src/main/java/org/apache/flink/tests/util/hbase/LocalStandaloneHBaseResource.java
+++ b/flink-connector-hbase-e2e-tests/src/main/java/org/apache/flink/tests/util/hbase/LocalStandaloneHBaseResource.java
@@ -40,192 +40,207 @@ import java.util.function.Consumer;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
-/**
- * {@link HBaseResource} that downloads hbase and set up a local hbase cluster.
- */
+/** {@link HBaseResource} that downloads hbase and set up a local hbase cluster. */
 public class LocalStandaloneHBaseResource implements HBaseResource {
 
-	private static final Logger LOG = LoggerFactory.getLogger(LocalStandaloneHBaseResource.class);
-
-	private static final int MAX_RETRIES = 3;
-	private static final int RETRY_INTERVAL_SECONDS = 30;
-	private final TemporaryFolder tmp = new TemporaryFolder();
-
-	private final DownloadCache downloadCache = DownloadCache.get();
-	private final String hbaseVersion;
-	private Path hbaseDir;
-
-	LocalStandaloneHBaseResource(String hbaseVersion) {
-		OperatingSystemRestriction.forbid(
-			String.format("The %s relies on UNIX utils and shell scripts.", getClass().getSimpleName()),
-			OperatingSystem.WINDOWS);
-		this.hbaseVersion = hbaseVersion;
-	}
-
-	private String getHBaseDownloadUrl() {
-		return String.format("https://archive.apache.org/dist/hbase/%1$s/hbase-%1$s-bin.tar.gz", hbaseVersion);
-	}
-
-	@Override
-	public void before() throws Exception {
-		tmp.create();
-		downloadCache.before();
-
-		this.hbaseDir = tmp.newFolder("hbase-" + hbaseVersion).toPath().toAbsolutePath();
-		setupHBaseDist();
-		setupHBaseCluster();
-	}
-
-	private void setupHBaseDist() throws IOException {
-		final Path downloadDirectory = tmp.newFolder("getOrDownload").toPath();
-		final Path hbaseArchive = downloadCache.getOrDownload(getHBaseDownloadUrl(), downloadDirectory);
-
-		LOG.info("HBase location: {}", hbaseDir.toAbsolutePath());
-		AutoClosableProcess.runBlocking(CommandLineWrapper
-			.tar(hbaseArchive)
-			.extract()
-			.zipped()
-			.strip(1)
-			.targetDir(hbaseDir)
-			.build());
-
-		LOG.info("Configure {} as hbase.tmp.dir", hbaseDir.toAbsolutePath());
-		final String tmpDirConfig = "<configuration><property><name>hbase.tmp.dir</name><value>" + hbaseDir + "</value></property></configuration>";
-		Files.write(hbaseDir.resolve(Paths.get("conf", "hbase-site.xml")), tmpDirConfig.getBytes());
-	}
-
-	private void setupHBaseCluster() throws IOException {
-		LOG.info("Starting HBase cluster...");
-		runHBaseProcessWithRetry("start-hbase.sh", () -> !isHMasterRunning());
-		LOG.info("Start HBase cluster success");
-	}
-
-	@Override
-	public void afterTestSuccess() {
-		shutdownResource();
-		downloadCache.afterTestSuccess();
-		tmp.delete();
-	}
-
-	private void shutdownResource() {
-		LOG.info("Stopping HBase Cluster...");
-		try {
-			runHBaseProcessWithRetry("stop-hbase.sh", () -> isHMasterAlive());
-		} catch (IOException ioe) {
-			LOG.warn("Error when shutting down HBase Cluster.", ioe);
-		}
-		LOG.info("Stop HBase Cluster success");
-	}
-
-	private void runHBaseProcessWithRetry(String command, Supplier<Boolean> processStatusChecker) throws IOException {
-		LOG.info("Execute {} for HBase Cluster", command);
-
-		for (int i = 1; i <= MAX_RETRIES; i++) {
-			try {
-				AutoClosableProcess.runBlocking(
-						hbaseDir.resolve(Paths.get("bin", command)).toString());
-			} catch (IOException ioe) {
-				LOG.warn("Get exception when execute {} ", command, ioe);
-			}
-
-			int waitSecond = 0;
-			while (processStatusChecker.get()) {
-				try {
-					LOG.info("Waiting for HBase {} works", command);
-					Thread.sleep(1000L);
-				} catch (InterruptedException e) {
-					LOG.warn("sleep interrupted", e);
-				}
-				waitSecond++;
-				if (waitSecond > RETRY_INTERVAL_SECONDS) {
-					break;
-				}
-			}
-
-			if (waitSecond < RETRY_INTERVAL_SECONDS) {
-				break;
-			} else {
-				if (i == MAX_RETRIES) {
-					LOG.error("Execute {} failed, retry times {}", command, i);
-					throw new IllegalArgumentException(String.format(
-							"Execute %s failed aftert retry %s times", command, i));
-				} else {
-					LOG.warn("Execute {} failed, retry times {}", command, i);
-				}
-			}
-		}
-	}
-
-	private boolean isHMasterRunning() {
-		try {
-			final AtomicBoolean atomicHMasterStarted = new AtomicBoolean(false);
-			queryHBaseStatus(line ->
-					atomicHMasterStarted.compareAndSet(false, line.contains("hbase:namespace")));
-			return atomicHMasterStarted.get();
-		} catch (IOException ioe) {
-			return false;
-		}
-	}
-
-	private void queryHBaseStatus(final Consumer<String> stdoutProcessor) throws IOException {
-		executeHBaseShell("scan 'hbase:meta'", stdoutProcessor);
-	}
-
-	private boolean isHMasterAlive() {
-		try {
-			final AtomicBoolean atomicHMasterStarted = new AtomicBoolean(false);
-			queryHBaseProcess(line ->
-					atomicHMasterStarted.compareAndSet(false, line.contains("HMaster")));
-			return atomicHMasterStarted.get();
-		} catch (IOException ioe) {
-			return false;
-		}
-	}
-
-	private void queryHBaseProcess(final Consumer<String> stdoutProcessor) throws IOException {
-		AutoClosableProcess
-				.create("jps")
-				.setStdoutProcessor(stdoutProcessor)
-				.runBlocking();
-	}
-
-	@Override
-	public void createTable(String tableName, String... columnFamilies) throws IOException {
-		final String createTable = String.format("create '%s',", tableName) +
-			Arrays.stream(columnFamilies)
-				.map(cf -> String.format("{NAME=>'%s'}", cf))
-				.collect(Collectors.joining(","));
-
-		executeHBaseShell(createTable);
-	}
-
-	@Override
-	public List<String> scanTable(String tableName) throws IOException {
-		final List<String> result = new ArrayList<>();
-		executeHBaseShell(String.format("scan '%s'", tableName), line -> {
-			if (line.contains("value=")) {
-				result.add(line);
-			}
-		});
-		return result;
-	}
-
-	@Override
-	public void putData(String tableName, String rowKey, String columnFamily, String columnQualifier, String value) throws IOException {
-		executeHBaseShell(
-			String.format("put '%s','%s','%s:%s','%s'", tableName, rowKey, columnFamily, columnQualifier, value));
-	}
-
-	private void executeHBaseShell(String cmd) throws IOException {
-		executeHBaseShell(cmd, line -> {
-		});
-	}
-
-	private void executeHBaseShell(String cmd, Consumer<String> stdoutProcessor) throws IOException {
-		AutoClosableProcess
-			.create(hbaseDir.resolve(Paths.get("bin", "hbase")).toString(), "shell")
-			.setStdoutProcessor(stdoutProcessor)
-			.setStdInputs(cmd)
-			.runBlocking();
-	}
+    private static final Logger LOG = LoggerFactory.getLogger(LocalStandaloneHBaseResource.class);
+
+    private static final int MAX_RETRIES = 3;
+    private static final int RETRY_INTERVAL_SECONDS = 30;
+    private final TemporaryFolder tmp = new TemporaryFolder();
+
+    private final DownloadCache downloadCache = DownloadCache.get();
+    private final String hbaseVersion;
+    private Path hbaseDir;
+
+    LocalStandaloneHBaseResource(String hbaseVersion) {
+        OperatingSystemRestriction.forbid(
+                String.format(
+                        "The %s relies on UNIX utils and shell scripts.",
+                        getClass().getSimpleName()),
+                OperatingSystem.WINDOWS);
+        this.hbaseVersion = hbaseVersion;
+    }
+
+    private String getHBaseDownloadUrl() {
+        return String.format(
+                "https://archive.apache.org/dist/hbase/%1$s/hbase-%1$s-bin.tar.gz", hbaseVersion);
+    }
+
+    @Override
+    public void before() throws Exception {
+        tmp.create();
+        downloadCache.before();
+
+        this.hbaseDir = tmp.newFolder("hbase-" + hbaseVersion).toPath().toAbsolutePath();
+        setupHBaseDist();
+        setupHBaseCluster();
+    }
+
+    private void setupHBaseDist() throws IOException {
+        final Path downloadDirectory = tmp.newFolder("getOrDownload").toPath();
+        final Path hbaseArchive =
+                downloadCache.getOrDownload(getHBaseDownloadUrl(), downloadDirectory);
+
+        LOG.info("HBase location: {}", hbaseDir.toAbsolutePath());
+        AutoClosableProcess.runBlocking(
+                CommandLineWrapper.tar(hbaseArchive)
+                        .extract()
+                        .zipped()
+                        .strip(1)
+                        .targetDir(hbaseDir)
+                        .build());
+
+        LOG.info("Configure {} as hbase.tmp.dir", hbaseDir.toAbsolutePath());
+        final String tmpDirConfig =
+                "<configuration><property><name>hbase.tmp.dir</name><value>"
+                        + hbaseDir
+                        + "</value></property></configuration>";
+        Files.write(hbaseDir.resolve(Paths.get("conf", "hbase-site.xml")), tmpDirConfig.getBytes());
+    }
+
+    private void setupHBaseCluster() throws IOException {
+        LOG.info("Starting HBase cluster...");
+        runHBaseProcessWithRetry("start-hbase.sh", () -> !isHMasterRunning());
+        LOG.info("Start HBase cluster success");
+    }
+
+    @Override
+    public void afterTestSuccess() {
+        shutdownResource();
+        downloadCache.afterTestSuccess();
+        tmp.delete();
+    }
+
+    private void shutdownResource() {
+        LOG.info("Stopping HBase Cluster...");
+        try {
+            runHBaseProcessWithRetry("stop-hbase.sh", () -> isHMasterAlive());
+        } catch (IOException ioe) {
+            LOG.warn("Error when shutting down HBase Cluster.", ioe);
+        }
+        LOG.info("Stop HBase Cluster success");
+    }
+
+    private void runHBaseProcessWithRetry(String command, Supplier<Boolean> processStatusChecker)
+            throws IOException {
+        LOG.info("Execute {} for HBase Cluster", command);
+
+        for (int i = 1; i <= MAX_RETRIES; i++) {
+            try {
+                AutoClosableProcess.runBlocking(
+                        hbaseDir.resolve(Paths.get("bin", command)).toString());
+            } catch (IOException ioe) {
+                LOG.warn("Get exception when execute {} ", command, ioe);
+            }
+
+            int waitSecond = 0;
+            while (processStatusChecker.get()) {
+                try {
+                    LOG.info("Waiting for HBase {} works", command);
+                    Thread.sleep(1000L);
+                } catch (InterruptedException e) {
+                    LOG.warn("sleep interrupted", e);
+                }
+                waitSecond++;
+                if (waitSecond > RETRY_INTERVAL_SECONDS) {
+                    break;
+                }
+            }
+
+            if (waitSecond < RETRY_INTERVAL_SECONDS) {
+                break;
+            } else {
+                if (i == MAX_RETRIES) {
+                    LOG.error("Execute {} failed, retry times {}", command, i);
+                    throw new IllegalArgumentException(
+                            String.format("Execute %s failed aftert retry %s times", command, i));
+                } else {
+                    LOG.warn("Execute {} failed, retry times {}", command, i);
+                }
+            }
+        }
+    }
+
+    private boolean isHMasterRunning() {
+        try {
+            final AtomicBoolean atomicHMasterStarted = new AtomicBoolean(false);
+            queryHBaseStatus(
+                    line ->
+                            atomicHMasterStarted.compareAndSet(
+                                    false, line.contains("hbase:namespace")));
+            return atomicHMasterStarted.get();
+        } catch (IOException ioe) {
+            return false;
+        }
+    }
+
+    private void queryHBaseStatus(final Consumer<String> stdoutProcessor) throws IOException {
+        executeHBaseShell("scan 'hbase:meta'", stdoutProcessor);
+    }
+
+    private boolean isHMasterAlive() {
+        try {
+            final AtomicBoolean atomicHMasterStarted = new AtomicBoolean(false);
+            queryHBaseProcess(
+                    line -> atomicHMasterStarted.compareAndSet(false, line.contains("HMaster")));
+            return atomicHMasterStarted.get();
+        } catch (IOException ioe) {
+            return false;
+        }
+    }
+
+    private void queryHBaseProcess(final Consumer<String> stdoutProcessor) throws IOException {
+        AutoClosableProcess.create("jps").setStdoutProcessor(stdoutProcessor).runBlocking();
+    }
+
+    @Override
+    public void createTable(String tableName, String... columnFamilies) throws IOException {
+        final String createTable =
+                String.format("create '%s',", tableName)
+                        + Arrays.stream(columnFamilies)
+                                .map(cf -> String.format("{NAME=>'%s'}", cf))
+                                .collect(Collectors.joining(","));
+
+        executeHBaseShell(createTable);
+    }
+
+    @Override
+    public List<String> scanTable(String tableName) throws IOException {
+        final List<String> result = new ArrayList<>();
+        executeHBaseShell(
+                String.format("scan '%s'", tableName),
+                line -> {
+                    if (line.contains("value=")) {
+                        result.add(line);
+                    }
+                });
+        return result;
+    }
+
+    @Override
+    public void putData(
+            String tableName,
+            String rowKey,
+            String columnFamily,
+            String columnQualifier,
+            String value)
+            throws IOException {
+        executeHBaseShell(
+                String.format(
+                        "put '%s','%s','%s:%s','%s'",
+                        tableName, rowKey, columnFamily, columnQualifier, value));
+    }
+
+    private void executeHBaseShell(String cmd) throws IOException {
+        executeHBaseShell(cmd, line -> {});
+    }
+
+    private void executeHBaseShell(String cmd, Consumer<String> stdoutProcessor)
+            throws IOException {
+        AutoClosableProcess.create(hbaseDir.resolve(Paths.get("bin", "hbase")).toString(), "shell")
+                .setStdoutProcessor(stdoutProcessor)
+                .setStdInputs(cmd)
+                .runBlocking();
+    }
 }
diff --git a/flink-connector-hbase-e2e-tests/src/main/java/org/apache/flink/tests/util/hbase/LocalStandaloneHBaseResourceFactory.java b/flink-connector-hbase-e2e-tests/src/main/java/org/apache/flink/tests/util/hbase/LocalStandaloneHBaseResourceFactory.java
index 2e57f0f..d624668 100644
--- a/flink-connector-hbase-e2e-tests/src/main/java/org/apache/flink/tests/util/hbase/LocalStandaloneHBaseResourceFactory.java
+++ b/flink-connector-hbase-e2e-tests/src/main/java/org/apache/flink/tests/util/hbase/LocalStandaloneHBaseResourceFactory.java
@@ -18,13 +18,11 @@
 
 package org.apache.flink.tests.util.hbase;
 
-/**
- * A {@link HBaseResourceFactory} for the {@link LocalStandaloneHBaseResourceFactory}.
- */
+/** A {@link HBaseResourceFactory} for the {@link LocalStandaloneHBaseResourceFactory}. */
 public class LocalStandaloneHBaseResourceFactory implements HBaseResourceFactory {
 
-	@Override
-	public HBaseResource create(String version) {
-		return new LocalStandaloneHBaseResource(version);
-	}
+    @Override
+    public HBaseResource create(String version) {
+        return new LocalStandaloneHBaseResource(version);
+    }
 }
diff --git a/flink-connector-hbase-e2e-tests/src/test/java/org/apache/flink/tests/util/hbase/SQLClientHBaseITCase.java b/flink-connector-hbase-e2e-tests/src/test/java/org/apache/flink/tests/util/hbase/SQLClientHBaseITCase.java
index 1bd51b3..b8c556d 100644
--- a/flink-connector-hbase-e2e-tests/src/test/java/org/apache/flink/tests/util/hbase/SQLClientHBaseITCase.java
+++ b/flink-connector-hbase-e2e-tests/src/test/java/org/apache/flink/tests/util/hbase/SQLClientHBaseITCase.java
@@ -65,159 +65,177 @@ import static org.hamcrest.Matchers.arrayContainingInAnyOrder;
 import static org.hamcrest.Matchers.containsString;
 import static org.junit.Assert.assertThat;
 
-/**
- * End-to-end test for the HBase connectors.
- */
+/** End-to-end test for the HBase connectors. */
 @RunWith(Parameterized.class)
 @Category(value = {TravisGroup1.class, PreCommit.class, FailsOnJava11.class})
 public class SQLClientHBaseITCase extends TestLogger {
 
-	private static final Logger LOG = LoggerFactory.getLogger(SQLClientHBaseITCase.class);
-
-	private static final String HBASE_E2E_SQL = "hbase_e2e.sql";
-
-	@Parameterized.Parameters(name = "{index}: hbase-version:{0}")
-	public static Collection<Object[]> data() {
-		return Arrays.asList(new Object[][]{
-				{"1.4.3", "hbase-1.4"},
-				{"2.2.3", "hbase-2.2"}
-		});
-	}
-
-	@Rule
-	public final HBaseResource hbase;
-
-	@Rule
-	public final FlinkResource flink = new LocalStandaloneFlinkResourceFactory()
-		.create(FlinkResourceSetup.builder().build());
-
-	@Rule
-	public final TemporaryFolder tmp = new TemporaryFolder();
-
-	private final String hbaseConnector;
-	private final Path sqlConnectorHBaseJar;
-
-	@ClassRule
-	public static final DownloadCache DOWNLOAD_CACHE = DownloadCache.get();
-
-	private static final Path sqlToolBoxJar = TestUtils.getResource(".*SqlToolbox.jar");
-	private static final Path hadoopClasspath = TestUtils.getResource(".*hadoop.classpath");
-	private List<Path> hadoopClasspathJars;
-
-	public SQLClientHBaseITCase(String hbaseVersion, String hbaseConnector) {
-		this.hbase = HBaseResource.get(hbaseVersion);
-		this.hbaseConnector = hbaseConnector;
-		this.sqlConnectorHBaseJar = TestUtils.getResource(".*sql-" + hbaseConnector + ".jar");
-	}
-
-	@Before
-	public void before() throws Exception {
-		DOWNLOAD_CACHE.before();
-		Path tmpPath = tmp.getRoot().toPath();
-		LOG.info("The current temporary path: {}", tmpPath);
-
-		// Prepare all hadoop jars to mock HADOOP_CLASSPATH, use hadoop.classpath which contains all hadoop jars
-		File hadoopClasspathFile = new File(hadoopClasspath.toAbsolutePath().toString());
-
-		if (!hadoopClasspathFile.exists()) {
-			throw new FileNotFoundException("File that contains hadoop classpath " + hadoopClasspath.toString()
-				+ " does not exist.");
-		}
-
-		String classPathContent = FileUtils.readFileUtf8(hadoopClasspathFile);
-		hadoopClasspathJars = Arrays.stream(classPathContent.split(":"))
-			.map(jar -> Paths.get(jar))
-			.collect(Collectors.toList());
-	}
-
-	@Test
-	public void testHBase() throws Exception {
-		try (ClusterController clusterController = flink.startCluster(2)) {
-			// Create table and put data
-			hbase.createTable("source", "family1", "family2");
-			hbase.createTable("sink", "family1", "family2");
-			hbase.putData("source", "row1", "family1", "f1c1", "v1");
-			hbase.putData("source", "row1", "family2", "f2c1", "v2");
-			hbase.putData("source", "row1", "family2", "f2c2", "v3");
-			hbase.putData("source", "row2", "family1", "f1c1", "v4");
-			hbase.putData("source", "row2", "family2", "f2c1", "v5");
-			hbase.putData("source", "row2", "family2", "f2c2", "v6");
-
-			// Initialize the SQL statements from "hbase_e2e.sql" file
-			Map<String, String> varsMap = new HashMap<>();
-			varsMap.put("$HBASE_CONNECTOR", hbaseConnector);
-			List<String> sqlLines = initializeSqlLines(varsMap);
-
-			// Execute SQL statements in "hbase_e2e.sql" file
-			executeSqlStatements(clusterController, sqlLines);
-
-			LOG.info("Verify the sink table result.");
-			// Wait until all the results flushed to the HBase sink table.
-			checkHBaseSinkResult();
-			LOG.info("The HBase SQL client test run successfully.");
-		}
-
-	}
-
-	private void checkHBaseSinkResult() throws Exception {
-		boolean success = false;
-		final Deadline deadline = Deadline.fromNow(Duration.ofSeconds(120));
-		while (deadline.hasTimeLeft()) {
-			final List<String> lines = hbase.scanTable("sink");
-			if (lines.size() == 6) {
-				success = true;
-				assertThat(
-					lines.toArray(new String[0]),
-					arrayContainingInAnyOrder(
-						CoreMatchers.allOf(containsString("row1"), containsString("family1"),
-							containsString("f1c1"), containsString("value1")),
-						CoreMatchers.allOf(containsString("row1"), containsString("family2"),
-							containsString("f2c1"), containsString("v2")),
-						CoreMatchers.allOf(containsString("row1"), containsString("family2"),
-							containsString("f2c2"), containsString("v3")),
-						CoreMatchers.allOf(containsString("row2"), containsString("family1"),
-							containsString("f1c1"), containsString("value4")),
-						CoreMatchers.allOf(containsString("row2"), containsString("family2"),
-							containsString("f2c1"), containsString("v5")),
-						CoreMatchers.allOf(containsString("row2"), containsString("family2"),
-							containsString("f2c2"), containsString("v6"))
-					)
-				);
-				break;
-			} else {
-				LOG.info("The HBase sink table does not contain enough records, current {} records, left time: {}s",
-					lines.size(), deadline.timeLeft().getSeconds());
-			}
-			Thread.sleep(500);
-		}
-		Assert.assertTrue("Did not get expected results before timeout.", success);
-	}
-
-	private List<String> initializeSqlLines(Map<String, String> vars) throws IOException {
-		URL url = SQLClientHBaseITCase.class.getClassLoader().getResource(HBASE_E2E_SQL);
-		if (url == null) {
-			throw new FileNotFoundException(HBASE_E2E_SQL);
-		}
-		List<String> lines = Files.readAllLines(new File(url.getFile()).toPath());
-		List<String> result = new ArrayList<>();
-		for (String line : lines) {
-			for (Map.Entry<String, String> var : vars.entrySet()) {
-				line = line.replace(var.getKey(), var.getValue());
-			}
-			result.add(line);
-		}
-
-		return result;
-	}
-
-	private void executeSqlStatements(ClusterController clusterController, List<String> sqlLines) throws IOException {
-		LOG.info("Executing SQL: HBase source table -> HBase sink table");
-		clusterController.submitSQLJob(
-			new SQLJobSubmission.SQLJobSubmissionBuilder(sqlLines)
-				.addJar(sqlToolBoxJar)
-				.addJar(sqlConnectorHBaseJar)
-				.addJars(hadoopClasspathJars)
-				.build(),
-			Duration.ofMinutes(2L));
-	}
+    private static final Logger LOG = LoggerFactory.getLogger(SQLClientHBaseITCase.class);
+
+    private static final String HBASE_E2E_SQL = "hbase_e2e.sql";
+
+    @Parameterized.Parameters(name = "{index}: hbase-version:{0}")
+    public static Collection<Object[]> data() {
+        return Arrays.asList(
+                new Object[][] {
+                    {"1.4.3", "hbase-1.4"},
+                    {"2.2.3", "hbase-2.2"}
+                });
+    }
+
+    @Rule public final HBaseResource hbase;
+
+    @Rule
+    public final FlinkResource flink =
+            new LocalStandaloneFlinkResourceFactory().create(FlinkResourceSetup.builder().build());
+
+    @Rule public final TemporaryFolder tmp = new TemporaryFolder();
+
+    private final String hbaseConnector;
+    private final Path sqlConnectorHBaseJar;
+
+    @ClassRule public static final DownloadCache DOWNLOAD_CACHE = DownloadCache.get();
+
+    private static final Path sqlToolBoxJar = TestUtils.getResource(".*SqlToolbox.jar");
+    private static final Path hadoopClasspath = TestUtils.getResource(".*hadoop.classpath");
+    private List<Path> hadoopClasspathJars;
+
+    public SQLClientHBaseITCase(String hbaseVersion, String hbaseConnector) {
+        this.hbase = HBaseResource.get(hbaseVersion);
+        this.hbaseConnector = hbaseConnector;
+        this.sqlConnectorHBaseJar = TestUtils.getResource(".*sql-" + hbaseConnector + ".jar");
+    }
+
+    @Before
+    public void before() throws Exception {
+        DOWNLOAD_CACHE.before();
+        Path tmpPath = tmp.getRoot().toPath();
+        LOG.info("The current temporary path: {}", tmpPath);
+
+        // Prepare all hadoop jars to mock HADOOP_CLASSPATH, use hadoop.classpath which contains all
+        // hadoop jars
+        File hadoopClasspathFile = new File(hadoopClasspath.toAbsolutePath().toString());
+
+        if (!hadoopClasspathFile.exists()) {
+            throw new FileNotFoundException(
+                    "File that contains hadoop classpath "
+                            + hadoopClasspath.toString()
+                            + " does not exist.");
+        }
+
+        String classPathContent = FileUtils.readFileUtf8(hadoopClasspathFile);
+        hadoopClasspathJars =
+                Arrays.stream(classPathContent.split(":"))
+                        .map(jar -> Paths.get(jar))
+                        .collect(Collectors.toList());
+    }
+
+    @Test
+    public void testHBase() throws Exception {
+        try (ClusterController clusterController = flink.startCluster(2)) {
+            // Create table and put data
+            hbase.createTable("source", "family1", "family2");
+            hbase.createTable("sink", "family1", "family2");
+            hbase.putData("source", "row1", "family1", "f1c1", "v1");
+            hbase.putData("source", "row1", "family2", "f2c1", "v2");
+            hbase.putData("source", "row1", "family2", "f2c2", "v3");
+            hbase.putData("source", "row2", "family1", "f1c1", "v4");
+            hbase.putData("source", "row2", "family2", "f2c1", "v5");
+            hbase.putData("source", "row2", "family2", "f2c2", "v6");
+
+            // Initialize the SQL statements from "hbase_e2e.sql" file
+            Map<String, String> varsMap = new HashMap<>();
+            varsMap.put("$HBASE_CONNECTOR", hbaseConnector);
+            List<String> sqlLines = initializeSqlLines(varsMap);
+
+            // Execute SQL statements in "hbase_e2e.sql" file
+            executeSqlStatements(clusterController, sqlLines);
+
+            LOG.info("Verify the sink table result.");
+            // Wait until all the results flushed to the HBase sink table.
+            checkHBaseSinkResult();
+            LOG.info("The HBase SQL client test run successfully.");
+        }
+    }
+
+    private void checkHBaseSinkResult() throws Exception {
+        boolean success = false;
+        final Deadline deadline = Deadline.fromNow(Duration.ofSeconds(120));
+        while (deadline.hasTimeLeft()) {
+            final List<String> lines = hbase.scanTable("sink");
+            if (lines.size() == 6) {
+                success = true;
+                assertThat(
+                        lines.toArray(new String[0]),
+                        arrayContainingInAnyOrder(
+                                CoreMatchers.allOf(
+                                        containsString("row1"),
+                                        containsString("family1"),
+                                        containsString("f1c1"),
+                                        containsString("value1")),
+                                CoreMatchers.allOf(
+                                        containsString("row1"),
+                                        containsString("family2"),
+                                        containsString("f2c1"),
+                                        containsString("v2")),
+                                CoreMatchers.allOf(
+                                        containsString("row1"),
+                                        containsString("family2"),
+                                        containsString("f2c2"),
+                                        containsString("v3")),
+                                CoreMatchers.allOf(
+                                        containsString("row2"),
+                                        containsString("family1"),
+                                        containsString("f1c1"),
+                                        containsString("value4")),
+                                CoreMatchers.allOf(
+                                        containsString("row2"),
+                                        containsString("family2"),
+                                        containsString("f2c1"),
+                                        containsString("v5")),
+                                CoreMatchers.allOf(
+                                        containsString("row2"),
+                                        containsString("family2"),
+                                        containsString("f2c2"),
+                                        containsString("v6"))));
+                break;
+            } else {
+                LOG.info(
+                        "The HBase sink table does not contain enough records, current {} records, left time: {}s",
+                        lines.size(),
+                        deadline.timeLeft().getSeconds());
+            }
+            Thread.sleep(500);
+        }
+        Assert.assertTrue("Did not get expected results before timeout.", success);
+    }
+
+    private List<String> initializeSqlLines(Map<String, String> vars) throws IOException {
+        URL url = SQLClientHBaseITCase.class.getClassLoader().getResource(HBASE_E2E_SQL);
+        if (url == null) {
+            throw new FileNotFoundException(HBASE_E2E_SQL);
+        }
+        List<String> lines = Files.readAllLines(new File(url.getFile()).toPath());
+        List<String> result = new ArrayList<>();
+        for (String line : lines) {
+            for (Map.Entry<String, String> var : vars.entrySet()) {
+                line = line.replace(var.getKey(), var.getValue());
+            }
+            result.add(line);
+        }
+
+        return result;
+    }
+
+    private void executeSqlStatements(ClusterController clusterController, List<String> sqlLines)
+            throws IOException {
+        LOG.info("Executing SQL: HBase source table -> HBase sink table");
+        clusterController.submitSQLJob(
+                new SQLJobSubmission.SQLJobSubmissionBuilder(sqlLines)
+                        .addJar(sqlToolBoxJar)
+                        .addJar(sqlConnectorHBaseJar)
+                        .addJars(hadoopClasspathJars)
+                        .build(),
+                Duration.ofMinutes(2L));
+    }
 }