You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2023/07/10 16:03:26 UTC
[flink-connector-hbase] 08/26: [FLINK-20651] Format code with Spotless/google-java-format
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-hbase.git
commit 9be77d73f9bed1d605a00e21c72c2661ade43c42
Author: Rufus Refactor <de...@flink.apache.org>
AuthorDate: Mon Dec 28 14:30:59 2020 +0100
[FLINK-20651] Format code with Spotless/google-java-format
---
.../flink/tests/util/hbase/HBaseResource.java | 88 ++---
.../tests/util/hbase/HBaseResourceFactory.java | 23 +-
.../util/hbase/LocalStandaloneHBaseResource.java | 387 +++++++++++----------
.../hbase/LocalStandaloneHBaseResourceFactory.java | 12 +-
.../tests/util/hbase/SQLClientHBaseITCase.java | 320 +++++++++--------
5 files changed, 433 insertions(+), 397 deletions(-)
diff --git a/flink-connector-hbase-e2e-tests/src/main/java/org/apache/flink/tests/util/hbase/HBaseResource.java b/flink-connector-hbase-e2e-tests/src/main/java/org/apache/flink/tests/util/hbase/HBaseResource.java
index 83bf249..a085b1d 100644
--- a/flink-connector-hbase-e2e-tests/src/main/java/org/apache/flink/tests/util/hbase/HBaseResource.java
+++ b/flink-connector-hbase-e2e-tests/src/main/java/org/apache/flink/tests/util/hbase/HBaseResource.java
@@ -24,50 +24,56 @@ import org.apache.flink.util.ExternalResource;
import java.io.IOException;
import java.util.List;
-/**
- * Generic interface for interacting with HBase.
- */
+/** Generic interface for interacting with HBase. */
public interface HBaseResource extends ExternalResource {
- /**
- * Creates a table with the given name and column families.
- *
- * @param tableName desired table name
- * @param columnFamilies column family to create
- * @throws IOException
- */
- void createTable(String tableName, String... columnFamilies) throws IOException;
+ /**
+ * Creates a table with the given name and column families.
+ *
+ * @param tableName desired table name
+ * @param columnFamilies column family to create
+ * @throws IOException
+ */
+ void createTable(String tableName, String... columnFamilies) throws IOException;
- /**
- * Scan the given HBase table.
- *
- * @param tableName table desired to scan
- * @throws IOException
- */
- List<String> scanTable(String tableName) throws IOException;
+ /**
+ * Scan the given HBase table.
+ *
+ * @param tableName table desired to scan
+ * @throws IOException
+ */
+ List<String> scanTable(String tableName) throws IOException;
- /**
- * Put the given data to the given table.
- *
- * @param tableName table to put data
- * @param rowKey row key of the given data
- * @param columnFamily column family of the given data
- * @param columnQualifier column qualifier of the given data
- * @param value value of the given data
- * @throws IOException
- */
- void putData(String tableName, String rowKey, String columnFamily, String columnQualifier, String value) throws IOException;
+ /**
+ * Put the given data to the given table.
+ *
+ * @param tableName table to put data
+ * @param rowKey row key of the given data
+ * @param columnFamily column family of the given data
+ * @param columnQualifier column qualifier of the given data
+ * @param value value of the given data
+ * @throws IOException
+ */
+ void putData(
+ String tableName,
+ String rowKey,
+ String columnFamily,
+ String columnQualifier,
+ String value)
+ throws IOException;
- /**
- * Returns the configured HBaseResource implementation, or a {@link LocalStandaloneHBaseResource} if none is configured.
- *
- * @param version The hbase version
- * @return configured HbaseResource, or {@link LocalStandaloneHBaseResource} if none is configured
- */
- static HBaseResource get(String version) {
- return FactoryUtils.loadAndInvokeFactory(
- HBaseResourceFactory.class,
- factory -> factory.create(version),
- LocalStandaloneHBaseResourceFactory::new);
- }
+ /**
+ * Returns the configured HBaseResource implementation, or a {@link
+ * LocalStandaloneHBaseResource} if none is configured.
+ *
+ * @param version The hbase version
+ * @return configured HbaseResource, or {@link LocalStandaloneHBaseResource} if none is
+ * configured
+ */
+ static HBaseResource get(String version) {
+ return FactoryUtils.loadAndInvokeFactory(
+ HBaseResourceFactory.class,
+ factory -> factory.create(version),
+ LocalStandaloneHBaseResourceFactory::new);
+ }
}
diff --git a/flink-connector-hbase-e2e-tests/src/main/java/org/apache/flink/tests/util/hbase/HBaseResourceFactory.java b/flink-connector-hbase-e2e-tests/src/main/java/org/apache/flink/tests/util/hbase/HBaseResourceFactory.java
index 27b3864..aec789f 100644
--- a/flink-connector-hbase-e2e-tests/src/main/java/org/apache/flink/tests/util/hbase/HBaseResourceFactory.java
+++ b/flink-connector-hbase-e2e-tests/src/main/java/org/apache/flink/tests/util/hbase/HBaseResourceFactory.java
@@ -20,19 +20,18 @@ package org.apache.flink.tests.util.hbase;
import java.util.Optional;
-/**
- * A factory for {@link HBaseResource} implementations.
- */
+/** A factory for {@link HBaseResource} implementations. */
@FunctionalInterface
public interface HBaseResourceFactory {
- /**
- * Returns a {@link HBaseResource} instance. If the instance could not be instantiated (for example, because a
- * mandatory parameter was missing), then an empty {@link Optional} should be returned.
- *
- * @param version The hbase version
- * @return HBaseResource instance
- * @throws Exception if the instance could not be instantiated
- */
- HBaseResource create(String version) throws Exception;
+ /**
+ * Returns a {@link HBaseResource} instance. If the instance could not be instantiated (for
+ * example, because a mandatory parameter was missing), then an empty {@link Optional} should be
+ * returned.
+ *
+ * @param version The hbase version
+ * @return HBaseResource instance
+ * @throws Exception if the instance could not be instantiated
+ */
+ HBaseResource create(String version) throws Exception;
}
diff --git a/flink-connector-hbase-e2e-tests/src/main/java/org/apache/flink/tests/util/hbase/LocalStandaloneHBaseResource.java b/flink-connector-hbase-e2e-tests/src/main/java/org/apache/flink/tests/util/hbase/LocalStandaloneHBaseResource.java
index d8c03d7..43df763 100644
--- a/flink-connector-hbase-e2e-tests/src/main/java/org/apache/flink/tests/util/hbase/LocalStandaloneHBaseResource.java
+++ b/flink-connector-hbase-e2e-tests/src/main/java/org/apache/flink/tests/util/hbase/LocalStandaloneHBaseResource.java
@@ -40,192 +40,207 @@ import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
-/**
- * {@link HBaseResource} that downloads hbase and set up a local hbase cluster.
- */
+/** {@link HBaseResource} that downloads hbase and set up a local hbase cluster. */
public class LocalStandaloneHBaseResource implements HBaseResource {
- private static final Logger LOG = LoggerFactory.getLogger(LocalStandaloneHBaseResource.class);
-
- private static final int MAX_RETRIES = 3;
- private static final int RETRY_INTERVAL_SECONDS = 30;
- private final TemporaryFolder tmp = new TemporaryFolder();
-
- private final DownloadCache downloadCache = DownloadCache.get();
- private final String hbaseVersion;
- private Path hbaseDir;
-
- LocalStandaloneHBaseResource(String hbaseVersion) {
- OperatingSystemRestriction.forbid(
- String.format("The %s relies on UNIX utils and shell scripts.", getClass().getSimpleName()),
- OperatingSystem.WINDOWS);
- this.hbaseVersion = hbaseVersion;
- }
-
- private String getHBaseDownloadUrl() {
- return String.format("https://archive.apache.org/dist/hbase/%1$s/hbase-%1$s-bin.tar.gz", hbaseVersion);
- }
-
- @Override
- public void before() throws Exception {
- tmp.create();
- downloadCache.before();
-
- this.hbaseDir = tmp.newFolder("hbase-" + hbaseVersion).toPath().toAbsolutePath();
- setupHBaseDist();
- setupHBaseCluster();
- }
-
- private void setupHBaseDist() throws IOException {
- final Path downloadDirectory = tmp.newFolder("getOrDownload").toPath();
- final Path hbaseArchive = downloadCache.getOrDownload(getHBaseDownloadUrl(), downloadDirectory);
-
- LOG.info("HBase location: {}", hbaseDir.toAbsolutePath());
- AutoClosableProcess.runBlocking(CommandLineWrapper
- .tar(hbaseArchive)
- .extract()
- .zipped()
- .strip(1)
- .targetDir(hbaseDir)
- .build());
-
- LOG.info("Configure {} as hbase.tmp.dir", hbaseDir.toAbsolutePath());
- final String tmpDirConfig = "<configuration><property><name>hbase.tmp.dir</name><value>" + hbaseDir + "</value></property></configuration>";
- Files.write(hbaseDir.resolve(Paths.get("conf", "hbase-site.xml")), tmpDirConfig.getBytes());
- }
-
- private void setupHBaseCluster() throws IOException {
- LOG.info("Starting HBase cluster...");
- runHBaseProcessWithRetry("start-hbase.sh", () -> !isHMasterRunning());
- LOG.info("Start HBase cluster success");
- }
-
- @Override
- public void afterTestSuccess() {
- shutdownResource();
- downloadCache.afterTestSuccess();
- tmp.delete();
- }
-
- private void shutdownResource() {
- LOG.info("Stopping HBase Cluster...");
- try {
- runHBaseProcessWithRetry("stop-hbase.sh", () -> isHMasterAlive());
- } catch (IOException ioe) {
- LOG.warn("Error when shutting down HBase Cluster.", ioe);
- }
- LOG.info("Stop HBase Cluster success");
- }
-
- private void runHBaseProcessWithRetry(String command, Supplier<Boolean> processStatusChecker) throws IOException {
- LOG.info("Execute {} for HBase Cluster", command);
-
- for (int i = 1; i <= MAX_RETRIES; i++) {
- try {
- AutoClosableProcess.runBlocking(
- hbaseDir.resolve(Paths.get("bin", command)).toString());
- } catch (IOException ioe) {
- LOG.warn("Get exception when execute {} ", command, ioe);
- }
-
- int waitSecond = 0;
- while (processStatusChecker.get()) {
- try {
- LOG.info("Waiting for HBase {} works", command);
- Thread.sleep(1000L);
- } catch (InterruptedException e) {
- LOG.warn("sleep interrupted", e);
- }
- waitSecond++;
- if (waitSecond > RETRY_INTERVAL_SECONDS) {
- break;
- }
- }
-
- if (waitSecond < RETRY_INTERVAL_SECONDS) {
- break;
- } else {
- if (i == MAX_RETRIES) {
- LOG.error("Execute {} failed, retry times {}", command, i);
- throw new IllegalArgumentException(String.format(
- "Execute %s failed aftert retry %s times", command, i));
- } else {
- LOG.warn("Execute {} failed, retry times {}", command, i);
- }
- }
- }
- }
-
- private boolean isHMasterRunning() {
- try {
- final AtomicBoolean atomicHMasterStarted = new AtomicBoolean(false);
- queryHBaseStatus(line ->
- atomicHMasterStarted.compareAndSet(false, line.contains("hbase:namespace")));
- return atomicHMasterStarted.get();
- } catch (IOException ioe) {
- return false;
- }
- }
-
- private void queryHBaseStatus(final Consumer<String> stdoutProcessor) throws IOException {
- executeHBaseShell("scan 'hbase:meta'", stdoutProcessor);
- }
-
- private boolean isHMasterAlive() {
- try {
- final AtomicBoolean atomicHMasterStarted = new AtomicBoolean(false);
- queryHBaseProcess(line ->
- atomicHMasterStarted.compareAndSet(false, line.contains("HMaster")));
- return atomicHMasterStarted.get();
- } catch (IOException ioe) {
- return false;
- }
- }
-
- private void queryHBaseProcess(final Consumer<String> stdoutProcessor) throws IOException {
- AutoClosableProcess
- .create("jps")
- .setStdoutProcessor(stdoutProcessor)
- .runBlocking();
- }
-
- @Override
- public void createTable(String tableName, String... columnFamilies) throws IOException {
- final String createTable = String.format("create '%s',", tableName) +
- Arrays.stream(columnFamilies)
- .map(cf -> String.format("{NAME=>'%s'}", cf))
- .collect(Collectors.joining(","));
-
- executeHBaseShell(createTable);
- }
-
- @Override
- public List<String> scanTable(String tableName) throws IOException {
- final List<String> result = new ArrayList<>();
- executeHBaseShell(String.format("scan '%s'", tableName), line -> {
- if (line.contains("value=")) {
- result.add(line);
- }
- });
- return result;
- }
-
- @Override
- public void putData(String tableName, String rowKey, String columnFamily, String columnQualifier, String value) throws IOException {
- executeHBaseShell(
- String.format("put '%s','%s','%s:%s','%s'", tableName, rowKey, columnFamily, columnQualifier, value));
- }
-
- private void executeHBaseShell(String cmd) throws IOException {
- executeHBaseShell(cmd, line -> {
- });
- }
-
- private void executeHBaseShell(String cmd, Consumer<String> stdoutProcessor) throws IOException {
- AutoClosableProcess
- .create(hbaseDir.resolve(Paths.get("bin", "hbase")).toString(), "shell")
- .setStdoutProcessor(stdoutProcessor)
- .setStdInputs(cmd)
- .runBlocking();
- }
+ private static final Logger LOG = LoggerFactory.getLogger(LocalStandaloneHBaseResource.class);
+
+ private static final int MAX_RETRIES = 3;
+ private static final int RETRY_INTERVAL_SECONDS = 30;
+ private final TemporaryFolder tmp = new TemporaryFolder();
+
+ private final DownloadCache downloadCache = DownloadCache.get();
+ private final String hbaseVersion;
+ private Path hbaseDir;
+
+ LocalStandaloneHBaseResource(String hbaseVersion) {
+ OperatingSystemRestriction.forbid(
+ String.format(
+ "The %s relies on UNIX utils and shell scripts.",
+ getClass().getSimpleName()),
+ OperatingSystem.WINDOWS);
+ this.hbaseVersion = hbaseVersion;
+ }
+
+ private String getHBaseDownloadUrl() {
+ return String.format(
+ "https://archive.apache.org/dist/hbase/%1$s/hbase-%1$s-bin.tar.gz", hbaseVersion);
+ }
+
+ @Override
+ public void before() throws Exception {
+ tmp.create();
+ downloadCache.before();
+
+ this.hbaseDir = tmp.newFolder("hbase-" + hbaseVersion).toPath().toAbsolutePath();
+ setupHBaseDist();
+ setupHBaseCluster();
+ }
+
+ private void setupHBaseDist() throws IOException {
+ final Path downloadDirectory = tmp.newFolder("getOrDownload").toPath();
+ final Path hbaseArchive =
+ downloadCache.getOrDownload(getHBaseDownloadUrl(), downloadDirectory);
+
+ LOG.info("HBase location: {}", hbaseDir.toAbsolutePath());
+ AutoClosableProcess.runBlocking(
+ CommandLineWrapper.tar(hbaseArchive)
+ .extract()
+ .zipped()
+ .strip(1)
+ .targetDir(hbaseDir)
+ .build());
+
+ LOG.info("Configure {} as hbase.tmp.dir", hbaseDir.toAbsolutePath());
+ final String tmpDirConfig =
+ "<configuration><property><name>hbase.tmp.dir</name><value>"
+ + hbaseDir
+ + "</value></property></configuration>";
+ Files.write(hbaseDir.resolve(Paths.get("conf", "hbase-site.xml")), tmpDirConfig.getBytes());
+ }
+
+ private void setupHBaseCluster() throws IOException {
+ LOG.info("Starting HBase cluster...");
+ runHBaseProcessWithRetry("start-hbase.sh", () -> !isHMasterRunning());
+ LOG.info("Start HBase cluster success");
+ }
+
+ @Override
+ public void afterTestSuccess() {
+ shutdownResource();
+ downloadCache.afterTestSuccess();
+ tmp.delete();
+ }
+
+ private void shutdownResource() {
+ LOG.info("Stopping HBase Cluster...");
+ try {
+ runHBaseProcessWithRetry("stop-hbase.sh", () -> isHMasterAlive());
+ } catch (IOException ioe) {
+ LOG.warn("Error when shutting down HBase Cluster.", ioe);
+ }
+ LOG.info("Stop HBase Cluster success");
+ }
+
+ private void runHBaseProcessWithRetry(String command, Supplier<Boolean> processStatusChecker)
+ throws IOException {
+ LOG.info("Execute {} for HBase Cluster", command);
+
+ for (int i = 1; i <= MAX_RETRIES; i++) {
+ try {
+ AutoClosableProcess.runBlocking(
+ hbaseDir.resolve(Paths.get("bin", command)).toString());
+ } catch (IOException ioe) {
+ LOG.warn("Get exception when execute {} ", command, ioe);
+ }
+
+ int waitSecond = 0;
+ while (processStatusChecker.get()) {
+ try {
+ LOG.info("Waiting for HBase {} works", command);
+ Thread.sleep(1000L);
+ } catch (InterruptedException e) {
+ LOG.warn("sleep interrupted", e);
+ }
+ waitSecond++;
+ if (waitSecond > RETRY_INTERVAL_SECONDS) {
+ break;
+ }
+ }
+
+ if (waitSecond < RETRY_INTERVAL_SECONDS) {
+ break;
+ } else {
+ if (i == MAX_RETRIES) {
+ LOG.error("Execute {} failed, retry times {}", command, i);
+ throw new IllegalArgumentException(
+ String.format("Execute %s failed aftert retry %s times", command, i));
+ } else {
+ LOG.warn("Execute {} failed, retry times {}", command, i);
+ }
+ }
+ }
+ }
+
+ private boolean isHMasterRunning() {
+ try {
+ final AtomicBoolean atomicHMasterStarted = new AtomicBoolean(false);
+ queryHBaseStatus(
+ line ->
+ atomicHMasterStarted.compareAndSet(
+ false, line.contains("hbase:namespace")));
+ return atomicHMasterStarted.get();
+ } catch (IOException ioe) {
+ return false;
+ }
+ }
+
+ private void queryHBaseStatus(final Consumer<String> stdoutProcessor) throws IOException {
+ executeHBaseShell("scan 'hbase:meta'", stdoutProcessor);
+ }
+
+ private boolean isHMasterAlive() {
+ try {
+ final AtomicBoolean atomicHMasterStarted = new AtomicBoolean(false);
+ queryHBaseProcess(
+ line -> atomicHMasterStarted.compareAndSet(false, line.contains("HMaster")));
+ return atomicHMasterStarted.get();
+ } catch (IOException ioe) {
+ return false;
+ }
+ }
+
+ private void queryHBaseProcess(final Consumer<String> stdoutProcessor) throws IOException {
+ AutoClosableProcess.create("jps").setStdoutProcessor(stdoutProcessor).runBlocking();
+ }
+
+ @Override
+ public void createTable(String tableName, String... columnFamilies) throws IOException {
+ final String createTable =
+ String.format("create '%s',", tableName)
+ + Arrays.stream(columnFamilies)
+ .map(cf -> String.format("{NAME=>'%s'}", cf))
+ .collect(Collectors.joining(","));
+
+ executeHBaseShell(createTable);
+ }
+
+ @Override
+ public List<String> scanTable(String tableName) throws IOException {
+ final List<String> result = new ArrayList<>();
+ executeHBaseShell(
+ String.format("scan '%s'", tableName),
+ line -> {
+ if (line.contains("value=")) {
+ result.add(line);
+ }
+ });
+ return result;
+ }
+
+ @Override
+ public void putData(
+ String tableName,
+ String rowKey,
+ String columnFamily,
+ String columnQualifier,
+ String value)
+ throws IOException {
+ executeHBaseShell(
+ String.format(
+ "put '%s','%s','%s:%s','%s'",
+ tableName, rowKey, columnFamily, columnQualifier, value));
+ }
+
+ private void executeHBaseShell(String cmd) throws IOException {
+ executeHBaseShell(cmd, line -> {});
+ }
+
+ private void executeHBaseShell(String cmd, Consumer<String> stdoutProcessor)
+ throws IOException {
+ AutoClosableProcess.create(hbaseDir.resolve(Paths.get("bin", "hbase")).toString(), "shell")
+ .setStdoutProcessor(stdoutProcessor)
+ .setStdInputs(cmd)
+ .runBlocking();
+ }
}
diff --git a/flink-connector-hbase-e2e-tests/src/main/java/org/apache/flink/tests/util/hbase/LocalStandaloneHBaseResourceFactory.java b/flink-connector-hbase-e2e-tests/src/main/java/org/apache/flink/tests/util/hbase/LocalStandaloneHBaseResourceFactory.java
index 2e57f0f..d624668 100644
--- a/flink-connector-hbase-e2e-tests/src/main/java/org/apache/flink/tests/util/hbase/LocalStandaloneHBaseResourceFactory.java
+++ b/flink-connector-hbase-e2e-tests/src/main/java/org/apache/flink/tests/util/hbase/LocalStandaloneHBaseResourceFactory.java
@@ -18,13 +18,11 @@
package org.apache.flink.tests.util.hbase;
-/**
- * A {@link HBaseResourceFactory} for the {@link LocalStandaloneHBaseResourceFactory}.
- */
+/** A {@link HBaseResourceFactory} for the {@link LocalStandaloneHBaseResourceFactory}. */
public class LocalStandaloneHBaseResourceFactory implements HBaseResourceFactory {
- @Override
- public HBaseResource create(String version) {
- return new LocalStandaloneHBaseResource(version);
- }
+ @Override
+ public HBaseResource create(String version) {
+ return new LocalStandaloneHBaseResource(version);
+ }
}
diff --git a/flink-connector-hbase-e2e-tests/src/test/java/org/apache/flink/tests/util/hbase/SQLClientHBaseITCase.java b/flink-connector-hbase-e2e-tests/src/test/java/org/apache/flink/tests/util/hbase/SQLClientHBaseITCase.java
index 1bd51b3..b8c556d 100644
--- a/flink-connector-hbase-e2e-tests/src/test/java/org/apache/flink/tests/util/hbase/SQLClientHBaseITCase.java
+++ b/flink-connector-hbase-e2e-tests/src/test/java/org/apache/flink/tests/util/hbase/SQLClientHBaseITCase.java
@@ -65,159 +65,177 @@ import static org.hamcrest.Matchers.arrayContainingInAnyOrder;
import static org.hamcrest.Matchers.containsString;
import static org.junit.Assert.assertThat;
-/**
- * End-to-end test for the HBase connectors.
- */
+/** End-to-end test for the HBase connectors. */
@RunWith(Parameterized.class)
@Category(value = {TravisGroup1.class, PreCommit.class, FailsOnJava11.class})
public class SQLClientHBaseITCase extends TestLogger {
- private static final Logger LOG = LoggerFactory.getLogger(SQLClientHBaseITCase.class);
-
- private static final String HBASE_E2E_SQL = "hbase_e2e.sql";
-
- @Parameterized.Parameters(name = "{index}: hbase-version:{0}")
- public static Collection<Object[]> data() {
- return Arrays.asList(new Object[][]{
- {"1.4.3", "hbase-1.4"},
- {"2.2.3", "hbase-2.2"}
- });
- }
-
- @Rule
- public final HBaseResource hbase;
-
- @Rule
- public final FlinkResource flink = new LocalStandaloneFlinkResourceFactory()
- .create(FlinkResourceSetup.builder().build());
-
- @Rule
- public final TemporaryFolder tmp = new TemporaryFolder();
-
- private final String hbaseConnector;
- private final Path sqlConnectorHBaseJar;
-
- @ClassRule
- public static final DownloadCache DOWNLOAD_CACHE = DownloadCache.get();
-
- private static final Path sqlToolBoxJar = TestUtils.getResource(".*SqlToolbox.jar");
- private static final Path hadoopClasspath = TestUtils.getResource(".*hadoop.classpath");
- private List<Path> hadoopClasspathJars;
-
- public SQLClientHBaseITCase(String hbaseVersion, String hbaseConnector) {
- this.hbase = HBaseResource.get(hbaseVersion);
- this.hbaseConnector = hbaseConnector;
- this.sqlConnectorHBaseJar = TestUtils.getResource(".*sql-" + hbaseConnector + ".jar");
- }
-
- @Before
- public void before() throws Exception {
- DOWNLOAD_CACHE.before();
- Path tmpPath = tmp.getRoot().toPath();
- LOG.info("The current temporary path: {}", tmpPath);
-
- // Prepare all hadoop jars to mock HADOOP_CLASSPATH, use hadoop.classpath which contains all hadoop jars
- File hadoopClasspathFile = new File(hadoopClasspath.toAbsolutePath().toString());
-
- if (!hadoopClasspathFile.exists()) {
- throw new FileNotFoundException("File that contains hadoop classpath " + hadoopClasspath.toString()
- + " does not exist.");
- }
-
- String classPathContent = FileUtils.readFileUtf8(hadoopClasspathFile);
- hadoopClasspathJars = Arrays.stream(classPathContent.split(":"))
- .map(jar -> Paths.get(jar))
- .collect(Collectors.toList());
- }
-
- @Test
- public void testHBase() throws Exception {
- try (ClusterController clusterController = flink.startCluster(2)) {
- // Create table and put data
- hbase.createTable("source", "family1", "family2");
- hbase.createTable("sink", "family1", "family2");
- hbase.putData("source", "row1", "family1", "f1c1", "v1");
- hbase.putData("source", "row1", "family2", "f2c1", "v2");
- hbase.putData("source", "row1", "family2", "f2c2", "v3");
- hbase.putData("source", "row2", "family1", "f1c1", "v4");
- hbase.putData("source", "row2", "family2", "f2c1", "v5");
- hbase.putData("source", "row2", "family2", "f2c2", "v6");
-
- // Initialize the SQL statements from "hbase_e2e.sql" file
- Map<String, String> varsMap = new HashMap<>();
- varsMap.put("$HBASE_CONNECTOR", hbaseConnector);
- List<String> sqlLines = initializeSqlLines(varsMap);
-
- // Execute SQL statements in "hbase_e2e.sql" file
- executeSqlStatements(clusterController, sqlLines);
-
- LOG.info("Verify the sink table result.");
- // Wait until all the results flushed to the HBase sink table.
- checkHBaseSinkResult();
- LOG.info("The HBase SQL client test run successfully.");
- }
-
- }
-
- private void checkHBaseSinkResult() throws Exception {
- boolean success = false;
- final Deadline deadline = Deadline.fromNow(Duration.ofSeconds(120));
- while (deadline.hasTimeLeft()) {
- final List<String> lines = hbase.scanTable("sink");
- if (lines.size() == 6) {
- success = true;
- assertThat(
- lines.toArray(new String[0]),
- arrayContainingInAnyOrder(
- CoreMatchers.allOf(containsString("row1"), containsString("family1"),
- containsString("f1c1"), containsString("value1")),
- CoreMatchers.allOf(containsString("row1"), containsString("family2"),
- containsString("f2c1"), containsString("v2")),
- CoreMatchers.allOf(containsString("row1"), containsString("family2"),
- containsString("f2c2"), containsString("v3")),
- CoreMatchers.allOf(containsString("row2"), containsString("family1"),
- containsString("f1c1"), containsString("value4")),
- CoreMatchers.allOf(containsString("row2"), containsString("family2"),
- containsString("f2c1"), containsString("v5")),
- CoreMatchers.allOf(containsString("row2"), containsString("family2"),
- containsString("f2c2"), containsString("v6"))
- )
- );
- break;
- } else {
- LOG.info("The HBase sink table does not contain enough records, current {} records, left time: {}s",
- lines.size(), deadline.timeLeft().getSeconds());
- }
- Thread.sleep(500);
- }
- Assert.assertTrue("Did not get expected results before timeout.", success);
- }
-
- private List<String> initializeSqlLines(Map<String, String> vars) throws IOException {
- URL url = SQLClientHBaseITCase.class.getClassLoader().getResource(HBASE_E2E_SQL);
- if (url == null) {
- throw new FileNotFoundException(HBASE_E2E_SQL);
- }
- List<String> lines = Files.readAllLines(new File(url.getFile()).toPath());
- List<String> result = new ArrayList<>();
- for (String line : lines) {
- for (Map.Entry<String, String> var : vars.entrySet()) {
- line = line.replace(var.getKey(), var.getValue());
- }
- result.add(line);
- }
-
- return result;
- }
-
- private void executeSqlStatements(ClusterController clusterController, List<String> sqlLines) throws IOException {
- LOG.info("Executing SQL: HBase source table -> HBase sink table");
- clusterController.submitSQLJob(
- new SQLJobSubmission.SQLJobSubmissionBuilder(sqlLines)
- .addJar(sqlToolBoxJar)
- .addJar(sqlConnectorHBaseJar)
- .addJars(hadoopClasspathJars)
- .build(),
- Duration.ofMinutes(2L));
- }
+ private static final Logger LOG = LoggerFactory.getLogger(SQLClientHBaseITCase.class);
+
+ private static final String HBASE_E2E_SQL = "hbase_e2e.sql";
+
+ @Parameterized.Parameters(name = "{index}: hbase-version:{0}")
+ public static Collection<Object[]> data() {
+ return Arrays.asList(
+ new Object[][] {
+ {"1.4.3", "hbase-1.4"},
+ {"2.2.3", "hbase-2.2"}
+ });
+ }
+
+ @Rule public final HBaseResource hbase;
+
+ @Rule
+ public final FlinkResource flink =
+ new LocalStandaloneFlinkResourceFactory().create(FlinkResourceSetup.builder().build());
+
+ @Rule public final TemporaryFolder tmp = new TemporaryFolder();
+
+ private final String hbaseConnector;
+ private final Path sqlConnectorHBaseJar;
+
+ @ClassRule public static final DownloadCache DOWNLOAD_CACHE = DownloadCache.get();
+
+ private static final Path sqlToolBoxJar = TestUtils.getResource(".*SqlToolbox.jar");
+ private static final Path hadoopClasspath = TestUtils.getResource(".*hadoop.classpath");
+ private List<Path> hadoopClasspathJars;
+
+ public SQLClientHBaseITCase(String hbaseVersion, String hbaseConnector) {
+ this.hbase = HBaseResource.get(hbaseVersion);
+ this.hbaseConnector = hbaseConnector;
+ this.sqlConnectorHBaseJar = TestUtils.getResource(".*sql-" + hbaseConnector + ".jar");
+ }
+
+ @Before
+ public void before() throws Exception {
+ DOWNLOAD_CACHE.before();
+ Path tmpPath = tmp.getRoot().toPath();
+ LOG.info("The current temporary path: {}", tmpPath);
+
+ // Prepare all hadoop jars to mock HADOOP_CLASSPATH, use hadoop.classpath which contains all
+ // hadoop jars
+ File hadoopClasspathFile = new File(hadoopClasspath.toAbsolutePath().toString());
+
+ if (!hadoopClasspathFile.exists()) {
+ throw new FileNotFoundException(
+ "File that contains hadoop classpath "
+ + hadoopClasspath.toString()
+ + " does not exist.");
+ }
+
+ String classPathContent = FileUtils.readFileUtf8(hadoopClasspathFile);
+ hadoopClasspathJars =
+ Arrays.stream(classPathContent.split(":"))
+ .map(jar -> Paths.get(jar))
+ .collect(Collectors.toList());
+ }
+
+ @Test
+ public void testHBase() throws Exception {
+ try (ClusterController clusterController = flink.startCluster(2)) {
+ // Create table and put data
+ hbase.createTable("source", "family1", "family2");
+ hbase.createTable("sink", "family1", "family2");
+ hbase.putData("source", "row1", "family1", "f1c1", "v1");
+ hbase.putData("source", "row1", "family2", "f2c1", "v2");
+ hbase.putData("source", "row1", "family2", "f2c2", "v3");
+ hbase.putData("source", "row2", "family1", "f1c1", "v4");
+ hbase.putData("source", "row2", "family2", "f2c1", "v5");
+ hbase.putData("source", "row2", "family2", "f2c2", "v6");
+
+ // Initialize the SQL statements from "hbase_e2e.sql" file
+ Map<String, String> varsMap = new HashMap<>();
+ varsMap.put("$HBASE_CONNECTOR", hbaseConnector);
+ List<String> sqlLines = initializeSqlLines(varsMap);
+
+ // Execute SQL statements in "hbase_e2e.sql" file
+ executeSqlStatements(clusterController, sqlLines);
+
+ LOG.info("Verify the sink table result.");
+ // Wait until all the results flushed to the HBase sink table.
+ checkHBaseSinkResult();
+ LOG.info("The HBase SQL client test run successfully.");
+ }
+ }
+
+ private void checkHBaseSinkResult() throws Exception {
+ boolean success = false;
+ final Deadline deadline = Deadline.fromNow(Duration.ofSeconds(120));
+ while (deadline.hasTimeLeft()) {
+ final List<String> lines = hbase.scanTable("sink");
+ if (lines.size() == 6) {
+ success = true;
+ assertThat(
+ lines.toArray(new String[0]),
+ arrayContainingInAnyOrder(
+ CoreMatchers.allOf(
+ containsString("row1"),
+ containsString("family1"),
+ containsString("f1c1"),
+ containsString("value1")),
+ CoreMatchers.allOf(
+ containsString("row1"),
+ containsString("family2"),
+ containsString("f2c1"),
+ containsString("v2")),
+ CoreMatchers.allOf(
+ containsString("row1"),
+ containsString("family2"),
+ containsString("f2c2"),
+ containsString("v3")),
+ CoreMatchers.allOf(
+ containsString("row2"),
+ containsString("family1"),
+ containsString("f1c1"),
+ containsString("value4")),
+ CoreMatchers.allOf(
+ containsString("row2"),
+ containsString("family2"),
+ containsString("f2c1"),
+ containsString("v5")),
+ CoreMatchers.allOf(
+ containsString("row2"),
+ containsString("family2"),
+ containsString("f2c2"),
+ containsString("v6"))));
+ break;
+ } else {
+ LOG.info(
+ "The HBase sink table does not contain enough records, current {} records, left time: {}s",
+ lines.size(),
+ deadline.timeLeft().getSeconds());
+ }
+ Thread.sleep(500);
+ }
+ Assert.assertTrue("Did not get expected results before timeout.", success);
+ }
+
+ private List<String> initializeSqlLines(Map<String, String> vars) throws IOException {
+ URL url = SQLClientHBaseITCase.class.getClassLoader().getResource(HBASE_E2E_SQL);
+ if (url == null) {
+ throw new FileNotFoundException(HBASE_E2E_SQL);
+ }
+ List<String> lines = Files.readAllLines(new File(url.getFile()).toPath());
+ List<String> result = new ArrayList<>();
+ for (String line : lines) {
+ for (Map.Entry<String, String> var : vars.entrySet()) {
+ line = line.replace(var.getKey(), var.getValue());
+ }
+ result.add(line);
+ }
+
+ return result;
+ }
+
+ private void executeSqlStatements(ClusterController clusterController, List<String> sqlLines)
+ throws IOException {
+ LOG.info("Executing SQL: HBase source table -> HBase sink table");
+ clusterController.submitSQLJob(
+ new SQLJobSubmission.SQLJobSubmissionBuilder(sqlLines)
+ .addJar(sqlToolBoxJar)
+ .addJar(sqlConnectorHBaseJar)
+ .addJars(hadoopClasspathJars)
+ .build(),
+ Duration.ofMinutes(2L));
+ }
}