You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/09/03 08:25:22 UTC
[kylin] 01/02: KYLIN-3488 Support Mysql as Kylin metadata storage
This is an automated email from the ASF dual-hosted git repository.
shaofengshi pushed a commit to branch 2.5.x
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 210719296973214bff7dcaf9a076d535ae7f7ddc
Author: GinaZhai <na...@kyligence.io>
AuthorDate: Fri Aug 31 15:05:21 2018 +0800
KYLIN-3488 Support Mysql as Kylin metadata storage
---
core-common/pom.xml | 39 ++
.../java/org/apache/kylin/common/KylinConfig.java | 22 +
.../org/apache/kylin/common/KylinConfigBase.java | 94 +++
.../kylin/common/persistence/BrokenEntity.java | 56 ++
.../common/persistence/BrokenInputStream.java | 58 ++
.../common/persistence/JDBCConnectionManager.java | 143 +++++
.../kylin/common/persistence/JDBCResource.java | 64 ++
.../kylin/common/persistence/JDBCResourceDAO.java | 688 +++++++++++++++++++++
.../common/persistence/JDBCResourceStore.java | 178 ++++++
.../common/persistence/JDBCSqlQueryFormat.java | 96 +++
.../persistence/JDBCSqlQueryFormatProvider.java | 53 ++
.../kylin/common/persistence/ResourceStore.java | 5 +
.../org/apache/kylin/common/util/HadoopUtil.java | 10 +
.../main/resources/metadata-jdbc-mysql.properties | 34 +
kylin-it/pom.xml | 5 +
.../storage/jdbc/ITJDBCResourceStoreTest.java | 322 ++++++++++
pom.xml | 10 +
17 files changed, 1877 insertions(+)
diff --git a/core-common/pom.xml b/core-common/pom.xml
index 6b48d65..2f1c8e2 100644
--- a/core-common/pom.xml
+++ b/core-common/pom.xml
@@ -91,5 +91,44 @@
<artifactId>dropwizard-metrics-hadoop-metrics2-reporter</artifactId>
<version>0.1.2</version>
</dependency>
+ <dependency>
+ <groupId>commons-dbcp</groupId>
+ <artifactId>commons-dbcp</artifactId>
+ <version>1.4</version>
+ </dependency>
+ <dependency>
+ <groupId>mysql</groupId>
+ <artifactId>mysql-connector-java</artifactId>
+ <scope>provided</scope>
+ </dependency>
</dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>copy</id>
+ <phase>package</phase>
+ <goals>
+ <goal>copy</goal>
+ </goals>
+ <configuration>
+ <artifactItems>
+ <artifactItem>
+ <groupId>mysql</groupId>
+ <artifactId>mysql-connector-java</artifactId>
+ <overWrite>true</overWrite>
+ <outputDirectory>${project.basedir}/../../build/ext</outputDirectory>
+ </artifactItem>
+ </artifactItems>
+ <overWriteReleases>false</overWriteReleases>
+ <overWriteSnapshots>true</overWriteSnapshots>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
</project>
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
index e09ce26..468bef7 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
@@ -534,4 +534,26 @@ public class KylinConfig extends KylinConfigBase {
return this.base() == ((KylinConfig) another).base();
}
+ public String getMetadataDialect() {
+ return SYS_ENV_INSTANCE.getOptional("kylin.metadata.jdbc.dialect", "mysql");
+ }
+
+ public boolean isJsonAlwaysSmallCell() {
+ return Boolean.valueOf(SYS_ENV_INSTANCE.getOptional("kylin.metadata.jdbc.json-always-small-cell", "true"));
+ }
+
+ public int getSmallCellMetadataWarningThreshold() {
+ return Integer.parseInt(SYS_ENV_INSTANCE.getOptional("kylin.metadata.jdbc.small-cell-meta-size-warning-threshold",
+ String.valueOf(100 << 20)));
+ }
+
+ public int getSmallCellMetadataErrorThreshold() {
+ return Integer.parseInt(
+ SYS_ENV_INSTANCE.getOptional("kylin.metadata.jdbc.small-cell-meta-size-error-threshold", String.valueOf(1 << 30)));
+ }
+
+ public int getJdbcResourceStoreMaxCellSize() {
+ return Integer.parseInt(SYS_ENV_INSTANCE.getOptional("kylin.metadata.jdbc.max-cell-size", "262144")); //256k
+ }
+
}
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 4895bf0..b8d87bc 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -226,6 +226,7 @@ abstract public class KylinConfigBase implements Serializable {
}
private String cachedHdfsWorkingDirectory;
+ private String cachedBigCellDirectory;
public String getHdfsWorkingDirectory() {
if (cachedHdfsWorkingDirectory != null)
@@ -260,6 +261,94 @@ abstract public class KylinConfigBase implements Serializable {
return cachedHdfsWorkingDirectory;
}
+ public String getMetastoreBigCellHdfsDirectory() {
+
+ if (cachedBigCellDirectory != null)
+ return cachedBigCellDirectory;
+
+
+ String root = getOptional("kylin.env.hdfs-metastore-bigcell-dir");
+
+ if (root == null) {
+ return getJdbcHdfsWorkingDirectory();
+ }
+
+ Path path = new Path(root);
+ if (!path.isAbsolute())
+ throw new IllegalArgumentException(
+ "kylin.env.hdfs-metastore-bigcell-dir must be absolute, but got " + root);
+
+ // make sure path is qualified
+ try {
+ FileSystem fs = HadoopUtil.getReadFileSystem();
+ path = fs.makeQualified(path);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ root = new Path(path, StringUtils.replaceChars(getMetadataUrlPrefix(), ':', '-')).toString();
+
+ if (!root.endsWith("/"))
+ root += "/";
+
+ cachedBigCellDirectory = root;
+ if (cachedBigCellDirectory.startsWith("file:")) {
+ cachedBigCellDirectory = cachedBigCellDirectory.replace("file:", "file://");
+ } else if (cachedBigCellDirectory.startsWith("maprfs:")) {
+ cachedBigCellDirectory = cachedBigCellDirectory.replace("maprfs:", "maprfs://");
+ }
+
+ return cachedBigCellDirectory;
+ }
+
+ private String getJdbcHdfsWorkingDirectory() {
+ if (StringUtils.isNotEmpty(getJdbcFileSystem())) {
+ Path workingDir = new Path(getReadHdfsWorkingDirectory());
+ return new Path(getJdbcFileSystem(), Path.getPathWithoutSchemeAndAuthority(workingDir)).toString() + "/";
+ }
+
+ return getReadHdfsWorkingDirectory();
+ }
+
+ /**
+ * Consider use kylin.env.hdfs-metastore-bigcell-dir instead of kylin.storage.columnar.jdbc.file-system
+ */
+ private String getJdbcFileSystem() {
+ return getOptional("kylin.storage.columnar.jdbc.file-system", "");
+ }
+
+ public String getHdfsWorkingDirectory(String project) {
+ if (isProjectIsolationEnabled() && project != null) {
+ return new Path(getHdfsWorkingDirectory(), project).toString() + "/";
+ } else {
+ return getHdfsWorkingDirectory();
+ }
+ }
+
+ private String getReadHdfsWorkingDirectory() {
+ if (StringUtils.isNotEmpty(getParquetReadFileSystem())) {
+ Path workingDir = new Path(getHdfsWorkingDirectory());
+ return new Path(getParquetReadFileSystem(), Path.getPathWithoutSchemeAndAuthority(workingDir)).toString()
+ + "/";
+ }
+
+ return getHdfsWorkingDirectory();
+ }
+
+ public String getReadHdfsWorkingDirectory(String project) {
+ if (StringUtils.isNotEmpty(getParquetReadFileSystem())) {
+ Path workingDir = new Path(getHdfsWorkingDirectory(project));
+ return new Path(getParquetReadFileSystem(), Path.getPathWithoutSchemeAndAuthority(workingDir)).toString()
+ + "/";
+ }
+
+ return getHdfsWorkingDirectory(project);
+ }
+
+ public String getParquetReadFileSystem() {
+ return getOptional("kylin.storage.columnar.file-system", "");
+ }
+
public String getZookeeperBasePath() {
return getOptional("kylin.env.zookeeper-base-path", "/kylin");
}
@@ -323,6 +412,7 @@ abstract public class KylinConfigBase implements Serializable {
r.put("hbase", "org.apache.kylin.storage.hbase.HBaseResourceStore");
r.put("hdfs", "org.apache.kylin.common.persistence.HDFSResourceStore");
r.put("ifile", "org.apache.kylin.common.persistence.IdentifierFileResourceStore");
+ r.put("jdbc", "org.apache.kylin.common.persistence.JDBCResourceStore");
r.putAll(getPropertiesByPrefix("kylin.metadata.resource-store-provider.")); // note the naming convention -- http://kylin.apache.org/development/coding_naming_convention.html
return r;
}
@@ -1309,6 +1399,10 @@ abstract public class KylinConfigBase implements Serializable {
return Boolean.parseBoolean(getOptional("kylin.query.stream-aggregate-enabled", "true"));
}
+ public boolean isProjectIsolationEnabled() {
+ return Boolean.parseBoolean(getOptional("kylin.storage.project-isolation-enable", "true"));
+ }
+
@Deprecated //Limit is good even it's large. This config is meaning less since we already have scan threshold
public int getStoragePushDownLimitMax() {
return Integer.parseInt(getOptional("kylin.query.max-limit-pushdown", "10000"));
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/BrokenEntity.java b/core-common/src/main/java/org/apache/kylin/common/persistence/BrokenEntity.java
new file mode 100644
index 0000000..6e1b4c2
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/BrokenEntity.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.persistence;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class BrokenEntity extends RootPersistentEntity {
+
+ public static final byte[] MAGIC = new byte[]{'B', 'R', 'O', 'K', 'E', 'N'};
+
+ @JsonProperty("resPath")
+ private String resPath;
+
+ @JsonProperty("errorMsg")
+ private String errorMsg;
+
+ public BrokenEntity() {
+ }
+
+ public BrokenEntity(String resPath, String errorMsg) {
+ this.resPath = resPath;
+ this.errorMsg = errorMsg;
+ }
+
+ public String getResPath() {
+ return resPath;
+ }
+
+ public void setResPath(String resPath) {
+ this.resPath = resPath;
+ }
+
+ public String getErrorMsg() {
+ return errorMsg;
+ }
+
+ public void setErrorMsg(String errorMsg) {
+ this.errorMsg = errorMsg;
+ }
+}
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/BrokenInputStream.java b/core-common/src/main/java/org/apache/kylin/common/persistence/BrokenInputStream.java
new file mode 100644
index 0000000..9eddba8
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/BrokenInputStream.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.persistence;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.kylin.common.util.JsonUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+public class BrokenInputStream extends InputStream {
+ private static Logger logger = LoggerFactory.getLogger(BrokenInputStream.class);
+ private final ByteArrayInputStream in;
+
+ public BrokenInputStream(BrokenEntity brokenEntity) {
+ final ByteArrayOutputStream out = new ByteArrayOutputStream();
+ try {
+ IOUtils.write(BrokenEntity.MAGIC, out);
+ IOUtils.write(JsonUtil.writeValueAsBytes(brokenEntity), out);
+ } catch (IOException e) {
+ logger.error("There is something error when we serialize BrokenEntity: ", e);
+ throw new RuntimeException("There is something error when we serialize BrokenEntity.");
+ }
+
+ in = new ByteArrayInputStream(out.toByteArray());
+ }
+
+ @Override
+ public int read() {
+ return in.read();
+ }
+
+ @Override
+ public void close() throws IOException {
+ in.close();
+ super.close();
+ }
+}
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCConnectionManager.java b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCConnectionManager.java
new file mode 100644
index 0000000..753601a
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCConnectionManager.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.persistence;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.dbcp.BasicDataSourceFactory;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.StorageURL;
+import org.apache.kylin.common.util.EncryptUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+public class JDBCConnectionManager {
+
+ private static final Logger logger = LoggerFactory.getLogger(JDBCConnectionManager.class);
+
+ private static JDBCConnectionManager INSTANCE = null;
+
+ private static Object lock = new Object();
+
+ public static JDBCConnectionManager getConnectionManager() {
+ if (INSTANCE == null) {
+ synchronized (lock) {
+ if (INSTANCE == null) {
+ INSTANCE = new JDBCConnectionManager(KylinConfig.getInstanceFromEnv());
+ }
+ }
+ }
+ return INSTANCE;
+ }
+
+ // ============================================================================
+
+ private final Map<String, String> dbcpProps;
+ private final DataSource dataSource;
+
+ private JDBCConnectionManager(KylinConfig config) {
+ try {
+ this.dbcpProps = initDbcpProps(config);
+
+ dataSource = BasicDataSourceFactory.createDataSource(getDbcpProperties());
+ Connection conn = getConn();
+ DatabaseMetaData mdm = conn.getMetaData();
+ logger.info("Connected to " + mdm.getDatabaseProductName() + " " + mdm.getDatabaseProductVersion());
+ closeQuietly(conn);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private Map<String, String> initDbcpProps(KylinConfig config) {
+ // metadataUrl is like "kylin_default_instance@jdbc,url=jdbc:mysql://localhost:3306/kylin,username=root,password=xxx"
+ StorageURL metadataUrl = config.getMetadataUrl();
+ JDBCResourceStore.checkScheme(metadataUrl);
+
+ LinkedHashMap<String, String> ret = new LinkedHashMap<>(metadataUrl.getAllParameters());
+ List<String> mandatoryItems = Arrays.asList("url", "username", "password");
+
+ for (String item : mandatoryItems) {
+ Preconditions.checkNotNull(ret.get(item),
+ "Setting item \"" + item + "\" is mandatory for Jdbc connections.");
+ }
+
+ // Check whether password encrypted
+ if ("true".equals(ret.get("passwordEncrypted"))) {
+ String password = ret.get("password");
+ ret.put("password", EncryptUtil.decrypt(password));
+ ret.remove("passwordEncrypted");
+ }
+
+ logger.info("Connecting to Jdbc with url:" + ret.get("url") + " by user " + ret.get("username"));
+
+ putIfMissing(ret, "driverClassName", "com.mysql.jdbc.Driver");
+ putIfMissing(ret, "maxActive", "5");
+ putIfMissing(ret, "maxIdle", "5");
+ putIfMissing(ret, "maxWait", "1000");
+ putIfMissing(ret, "removeAbandoned", "true");
+ putIfMissing(ret, "removeAbandonedTimeout", "180");
+ putIfMissing(ret, "testOnBorrow", "true");
+ putIfMissing(ret, "testWhileIdle", "true");
+ putIfMissing(ret, "validationQuery", "select 1");
+ return ret;
+ }
+
+ private void putIfMissing(LinkedHashMap<String, String> map, String key, String value) {
+ if (map.containsKey(key) == false)
+ map.put(key, value);
+ }
+
+ public final Connection getConn() throws SQLException {
+ return dataSource.getConnection();
+ }
+
+ public Properties getDbcpProperties() {
+ Properties ret = new Properties();
+ ret.putAll(dbcpProps);
+ return ret;
+ }
+
+ public static void closeQuietly(AutoCloseable obj) {
+ if (obj != null) {
+ try {
+ obj.close();
+ } catch (Exception e) {
+ logger.warn("Error closing " + obj, e);
+ }
+ }
+ }
+
+ public void close() {
+ try {
+ ((org.apache.commons.dbcp.BasicDataSource) dataSource).close();
+ } catch (SQLException e) {
+ logger.error("error closing data source", e);
+ }
+ }
+}
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResource.java b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResource.java
new file mode 100644
index 0000000..f9c3000
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResource.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.kylin.common.persistence;
+
+import java.io.InputStream;
+
+public class JDBCResource {
+ private String path;
+
+ private long timestamp;
+
+ private InputStream content;
+
+ public JDBCResource() {
+
+ }
+
+ public JDBCResource(String path, long timestamp, InputStream content) {
+ this.path = path;
+ this.timestamp = timestamp;
+ this.content = content;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ public void setPath(String path) {
+ this.path = path;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ public void setTimestamp(long timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ public InputStream getContent() {
+ return content;
+ }
+
+ public void setContent(InputStream content) {
+ this.content = content;
+ }
+}
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceDAO.java b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceDAO.java
new file mode 100644
index 0000000..f267530
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceDAO.java
@@ -0,0 +1,688 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.persistence;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedInputStream;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Types;
+import java.text.MessageFormat;
+import java.util.List;
+import java.util.TreeSet;
+
+public class JDBCResourceDAO {
+
+ private static Logger logger = LoggerFactory.getLogger(JDBCResourceDAO.class);
+
+ private static final String META_TABLE_KEY = "META_TABLE_KEY";
+
+ private static final String META_TABLE_TS = "META_TABLE_TS";
+
+ private static final String META_TABLE_CONTENT = "META_TABLE_CONTENT";
+
+ private JDBCConnectionManager connectionManager;
+
+ private JDBCSqlQueryFormat jdbcSqlQueryFormat;
+
+ private String[] tablesName;
+
+ private KylinConfig kylinConfig;
+
+ // For test
+ private long queriedSqlNum = 0;
+
+ public JDBCResourceDAO(KylinConfig kylinConfig, String[] tablesName) throws SQLException {
+ this.kylinConfig = kylinConfig;
+ this.connectionManager = JDBCConnectionManager.getConnectionManager();
+ this.jdbcSqlQueryFormat = JDBCSqlQueryFormatProvider.createJDBCSqlQueriesFormat(kylinConfig.getMetadataDialect());
+ this.tablesName = tablesName;
+ for (int i = 0; i < tablesName.length; i++) {
+ createTableIfNeeded(tablesName[i]);
+ createIndex("IDX_" + META_TABLE_TS, tablesName[i], META_TABLE_TS);
+ }
+ }
+
+ public void close() {
+ connectionManager.close();
+ }
+
+ public JDBCResource getResource(final String resourcePath, final boolean fetchContent, final boolean fetchTimestamp)
+ throws SQLException {
+ return getResource(resourcePath, fetchContent, fetchTimestamp, false);
+ }
+
+ public JDBCResource getResource(final String resourcePath, final boolean fetchContent, final boolean fetchTimestamp,
+ final boolean isAllowBroken) throws SQLException {
+ final JDBCResource resource = new JDBCResource();
+ logger.trace("getResource method. resourcePath : {} , fetchConetent : {} , fetch TS : {}", resourcePath,
+ fetchContent, fetchTimestamp);
+ executeSql(new SqlOperation() {
+ @Override
+ public void execute(Connection connection) throws SQLException {
+ String tableName = getMetaTableName(resourcePath);
+ pstat = connection.prepareStatement(getKeyEqualSqlString(tableName, fetchContent, fetchTimestamp));
+ pstat.setString(1, resourcePath);
+ rs = pstat.executeQuery();
+ if (rs.next()) {
+ resource.setPath(rs.getString(META_TABLE_KEY));
+ if (fetchTimestamp)
+ resource.setTimestamp(rs.getLong(META_TABLE_TS));
+ if (fetchContent) {
+ try {
+ resource.setContent(getInputStream(resourcePath, rs));
+ } catch (Throwable e) {
+ if (!isAllowBroken) {
+ throw new SQLException(e);
+ }
+
+ final BrokenEntity brokenEntity = new BrokenEntity(resourcePath, e.getMessage());
+ resource.setContent(new BrokenInputStream(brokenEntity));
+ logger.warn(e.getMessage());
+ }
+ }
+ }
+ }
+ });
+ if (resource.getPath() != null) {
+ return resource;
+ } else {
+ return null;
+ }
+ }
+
+ public boolean existResource(final String resourcePath) throws SQLException {
+ JDBCResource resource = getResource(resourcePath, false, false);
+ return (resource != null);
+ }
+
+ public long getResourceTimestamp(final String resourcePath) throws SQLException {
+ JDBCResource resource = getResource(resourcePath, false, true);
+ return resource == null ? 0 : resource.getTimestamp();
+ }
+
+ //fetch primary key only
+ public TreeSet<String> listAllResource(final String folderPath, final boolean recursive) throws SQLException {
+ final TreeSet<String> allResourceName = new TreeSet<>();
+ executeSql(new SqlOperation() {
+ @Override
+ public void execute(Connection connection) throws SQLException {
+ String tableName = getMetaTableName(folderPath);
+ pstat = connection.prepareStatement(getListResourceSqlString(tableName));
+ pstat.setString(1, folderPath + "%");
+ rs = pstat.executeQuery();
+ while (rs.next()) {
+ String path = rs.getString(META_TABLE_KEY);
+ assert path.startsWith(folderPath);
+ if (recursive) {
+ allResourceName.add(path);
+ } else {
+ int cut = path.indexOf('/', folderPath.length());
+ String child = cut < 0 ? path : path.substring(0, cut);
+ allResourceName.add(child);
+ }
+ }
+ }
+ });
+ return allResourceName;
+ }
+
+ public List<JDBCResource> getAllResource(final String folderPath, final long timeStart, final long timeEndExclusive,
+ final boolean isAllowBroken) throws SQLException {
+ final List<JDBCResource> allResource = Lists.newArrayList();
+ executeSql(new SqlOperation() {
+ @Override
+ public void execute(Connection connection) throws SQLException {
+ String tableName = getMetaTableName(folderPath);
+ pstat = connection.prepareStatement(getAllResourceSqlString(tableName));
+ pstat.setString(1, folderPath + "%");
+ pstat.setLong(2, timeStart);
+ pstat.setLong(3, timeEndExclusive);
+ rs = pstat.executeQuery();
+ while (rs.next()) {
+ String resPath = rs.getString(META_TABLE_KEY);
+ if (checkPath(folderPath, resPath)) {
+ JDBCResource resource = new JDBCResource();
+ resource.setPath(resPath);
+ resource.setTimestamp(rs.getLong(META_TABLE_TS));
+ try {
+ resource.setContent(getInputStream(resPath, rs));
+ } catch (Throwable e) {
+ if (!isAllowBroken) {
+ throw new SQLException(e);
+ }
+
+ final BrokenEntity brokenEntity = new BrokenEntity(resPath, e.getMessage());
+ resource.setContent(new BrokenInputStream(brokenEntity));
+ logger.warn(e.getMessage());
+ }
+ allResource.add(resource);
+ }
+ }
+ }
+ });
+ return allResource;
+ }
+
+ private boolean checkPath(String lookForPrefix, String resPath) {
+ lookForPrefix = lookForPrefix.endsWith("/") ? lookForPrefix : lookForPrefix + "/";
+ assert resPath.startsWith(lookForPrefix);
+ int cut = resPath.indexOf('/', lookForPrefix.length());
+ return (cut < 0);
+ }
+
+ private boolean isJsonMetadata(String resourcePath) {
+ String trim = resourcePath.trim();
+ return trim.endsWith(".json") || trim.startsWith(ResourceStore.EXECUTE_RESOURCE_ROOT)
+ || trim.startsWith(ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT);
+
+ }
+
+ public void deleteResource(final String resourcePath) throws SQLException {
+
+ boolean skipHdfs = isJsonMetadata(resourcePath);
+
+ executeSql(new SqlOperation() {
+ @Override
+ public void execute(Connection connection) throws SQLException {
+ String tableName = getMetaTableName(resourcePath);
+ pstat = connection.prepareStatement(getDeletePstatSql(tableName));
+ pstat.setString(1, resourcePath);
+ pstat.executeUpdate();
+ }
+ });
+
+ if (!skipHdfs) {
+ try {
+ deleteHDFSResourceIfExist(resourcePath);
+ } catch (Throwable e) {
+ throw new SQLException(e);
+ }
+ }
+ }
+
+ private void deleteHDFSResourceIfExist(String resourcePath) throws IOException {
+
+ Path redirectPath = bigCellHDFSPath(resourcePath);
+ FileSystem fileSystem = HadoopUtil.getFileSystem(redirectPath);
+
+ if (fileSystem.exists(redirectPath)) {
+ fileSystem.delete(redirectPath, true);
+ }
+
+ }
+
+ public void putResource(final JDBCResource resource) throws SQLException {
+ executeSql(new SqlOperation() {
+ @Override
+ public void execute(Connection connection) throws SQLException {
+ byte[] content = getResourceDataBytes(resource);
+ synchronized (resource.getPath().intern()) {
+ boolean existing = existResource(resource.getPath());
+ String tableName = getMetaTableName(resource.getPath());
+ if (existing) {
+ pstat = connection.prepareStatement(getReplaceSql(tableName));
+ pstat.setLong(1, resource.getTimestamp());
+ pstat.setBlob(2, new BufferedInputStream(new ByteArrayInputStream(content)));
+ pstat.setString(3, resource.getPath());
+ } else {
+ pstat = connection.prepareStatement(getInsertSql(tableName));
+ pstat.setString(1, resource.getPath());
+ pstat.setLong(2, resource.getTimestamp());
+ pstat.setBlob(3, new BufferedInputStream(new ByteArrayInputStream(content)));
+ }
+
+ if (isContentOverflow(content, resource.getPath())) {
+ logger.debug("Overflow! resource path: {}, content size: {}, timeStamp: {}", resource.getPath(),
+ content.length, resource.getTimestamp());
+ if (existing) {
+ pstat.setNull(2, Types.BLOB);
+ } else {
+ pstat.setNull(3, Types.BLOB);
+ }
+ writeLargeCellToHdfs(resource.getPath(), content);
+ try {
+ int result = pstat.executeUpdate();
+ if (result != 1)
+ throw new SQLException();
+ } catch (SQLException e) {
+ rollbackLargeCellFromHdfs(resource.getPath());
+ throw e;
+ }
+ if (existing) {
+ cleanOldLargeCellFromHdfs(resource.getPath());
+ }
+ } else {
+ pstat.executeUpdate();
+ }
+ }
+ }
+ });
+ }
+
+ public void checkAndPutResource(final String resPath, final byte[] content, final long oldTS, final long newTS)
+ throws SQLException, WriteConflictException {
+ logger.trace(
+ "execute checkAndPutResource method. resPath : {} , oldTs : {} , newTs : {} , content null ? : {} ",
+ resPath, oldTS, newTS, content == null);
+ executeSql(new SqlOperation() {
+ @Override
+ public void execute(Connection connection) throws SQLException {
+ synchronized (resPath.intern()) {
+ String tableName = getMetaTableName(resPath);
+ if (!existResource(resPath)) {
+ if (oldTS != 0) {
+ throw new IllegalStateException(
+ "For not exist file. OldTS have to be 0. but Actual oldTS is : " + oldTS);
+ }
+ if (isContentOverflow(content, resPath)) {
+ logger.debug("Overflow! resource path: {}, content size: {}", resPath, content.length);
+ pstat = connection.prepareStatement(getInsertSqlWithoutContent(tableName));
+ pstat.setString(1, resPath);
+ pstat.setLong(2, newTS);
+ writeLargeCellToHdfs(resPath, content);
+ try {
+ int result = pstat.executeUpdate();
+ if (result != 1)
+ throw new SQLException();
+ } catch (SQLException e) {
+ rollbackLargeCellFromHdfs(resPath);
+ throw e;
+ }
+ } else {
+ pstat = connection.prepareStatement(getInsertSql(tableName));
+ pstat.setString(1, resPath);
+ pstat.setLong(2, newTS);
+ pstat.setBlob(3, new BufferedInputStream(new ByteArrayInputStream(content)));
+ pstat.executeUpdate();
+ }
+ } else {
+ // Note the checkAndPut trick:
+ // update {0} set {1}=? where {2}=? and {3}=?
+ pstat = connection.prepareStatement(getUpdateSqlWithoutContent(tableName));
+ pstat.setLong(1, newTS);
+ pstat.setString(2, resPath);
+ pstat.setLong(3, oldTS);
+ int result = pstat.executeUpdate();
+ if (result != 1) {
+ long realTime = getResourceTimestamp(resPath);
+ throw new WriteConflictException("Overwriting conflict " + resPath + ", expect old TS "
+ + oldTS + ", but it is " + realTime);
+ }
+ PreparedStatement pstat2 = null;
+ try {
+ // "update {0} set {1}=? where {3}=?"
+ pstat2 = connection.prepareStatement(getUpdateContentSql(tableName));
+ if (isContentOverflow(content, resPath)) {
+ logger.debug("Overflow! resource path: {}, content size: {}", resPath, content.length);
+ pstat2.setNull(1, Types.BLOB);
+ pstat2.setString(2, resPath);
+ writeLargeCellToHdfs(resPath, content);
+ try {
+ int result2 = pstat2.executeUpdate();
+ if (result2 != 1)
+ throw new SQLException();
+ } catch (SQLException e) {
+ rollbackLargeCellFromHdfs(resPath);
+ throw e;
+ }
+ cleanOldLargeCellFromHdfs(resPath);
+ } else {
+ pstat2.setBinaryStream(1, new BufferedInputStream(new ByteArrayInputStream(content)));
+ pstat2.setString(2, resPath);
+ pstat2.executeUpdate();
+ }
+ } finally {
+ JDBCConnectionManager.closeQuietly(pstat2);
+ }
+ }
+ }
+ }
+ });
+ }
+
+ private byte[] getResourceDataBytes(JDBCResource resource) throws SQLException {
+ ByteArrayOutputStream bout = null;
+ try {
+ bout = new ByteArrayOutputStream();
+ IOUtils.copy(resource.getContent(), bout);
+ return bout.toByteArray();
+ } catch (Throwable e) {
+ throw new SQLException(e);
+ } finally {
+ IOUtils.closeQuietly(bout);
+ }
+ }
+
+ private boolean isContentOverflow(byte[] content, String resPath) throws SQLException {
+ if (kylinConfig.isJsonAlwaysSmallCell() && isJsonMetadata(resPath)) {
+
+ int smallCellMetadataWarningThreshold = kylinConfig.getSmallCellMetadataWarningThreshold();
+ int smallCellMetadataErrorThreshold = kylinConfig.getSmallCellMetadataErrorThreshold();
+
+ if (content.length > smallCellMetadataWarningThreshold) {
+ logger.warn(
+ "A JSON metadata entry's size is not supposed to exceed kylin.metadata.jdbc.small-cell-meta-size-warning-threshold("
+ + smallCellMetadataWarningThreshold + "), resPath: " + resPath + ", actual size: "
+ + content.length);
+ }
+ if (content.length > smallCellMetadataErrorThreshold) {
+ throw new SQLException(new IllegalArgumentException(
+ "A JSON metadata entry's size is not supposed to exceed kylin.metadata.jdbc.small-cell-meta-size-error-threshold("
+ + smallCellMetadataErrorThreshold + "), resPath: " + resPath + ", actual size: "
+ + content.length));
+ }
+
+ return false;
+ }
+
+ int maxSize = kylinConfig.getJdbcResourceStoreMaxCellSize();
+ if (content.length > maxSize)
+ return true;
+ else
+ return false;
+ }
+
+ private void createTableIfNeeded(final String tableName) throws SQLException {
+ executeSql(new SqlOperation() {
+ @Override
+ public void execute(Connection connection) throws SQLException {
+ if (checkTableExists(tableName, connection)) {
+ logger.info("Table [{}] already exists", tableName);
+ return;
+ }
+
+ pstat = connection.prepareStatement(getCreateIfNeededSql(tableName));
+ pstat.executeUpdate();
+ logger.info("Create table [{}] success", tableName);
+ }
+
+ private boolean checkTableExists(final String tableName, final Connection connection) throws SQLException {
+ final PreparedStatement ps = connection.prepareStatement(getCheckTableExistsSql(tableName));
+ final ResultSet rs = ps.executeQuery();
+ while (rs.next()) {
+ if (tableName.equals(rs.getString(1))) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+ });
+ }
+
+ private void createIndex(final String indexName, final String tableName, final String colName) {
+ try {
+ executeSql(new SqlOperation() {
+ @Override
+ public void execute(Connection connection) throws SQLException {
+ pstat = connection.prepareStatement(getCreateIndexSql(indexName, tableName, colName));
+ pstat.executeUpdate();
+ }
+ });
+ } catch (SQLException ex) {
+ logger.info("Create index failed with message: " + ex.getLocalizedMessage());
+ }
+ }
+
+ abstract static class SqlOperation {
+ PreparedStatement pstat = null;
+ ResultSet rs = null;
+
+ abstract public void execute(final Connection connection) throws SQLException;
+ }
+
+ private void executeSql(SqlOperation operation) throws SQLException {
+ Connection connection = null;
+ try {
+ connection = connectionManager.getConn();
+ operation.execute(connection);
+ queriedSqlNum++;
+ } finally {
+ JDBCConnectionManager.closeQuietly(operation.rs);
+ JDBCConnectionManager.closeQuietly(operation.pstat);
+ JDBCConnectionManager.closeQuietly(connection);
+ }
+ }
+
+ private String getCheckTableExistsSql(final String tableName) {
+ final String sql = MessageFormat.format(jdbcSqlQueryFormat.getCheckTableExistsSql(), tableName);
+ return sql;
+ }
+
+ //sql queries
+ private String getCreateIfNeededSql(String tableName) {
+ String sql = MessageFormat.format(jdbcSqlQueryFormat.getCreateIfNeedSql(), tableName, META_TABLE_KEY,
+ META_TABLE_TS, META_TABLE_CONTENT);
+ return sql;
+ }
+
+ //sql queries
+ private String getCreateIndexSql(String indexName, String tableName, String indexCol) {
+ String sql = MessageFormat.format(jdbcSqlQueryFormat.getCreateIndexSql(), indexName, tableName, indexCol);
+ return sql;
+ }
+
+ private String getKeyEqualSqlString(String tableName, boolean fetchContent, boolean fetchTimestamp) {
+ String sql = MessageFormat.format(jdbcSqlQueryFormat.getKeyEqualsSql(),
+ getSelectList(fetchContent, fetchTimestamp), tableName, META_TABLE_KEY);
+ return sql;
+ }
+
+ private String getDeletePstatSql(String tableName) {
+ String sql = MessageFormat.format(jdbcSqlQueryFormat.getDeletePstatSql(), tableName, META_TABLE_KEY);
+ return sql;
+ }
+
+ private String getListResourceSqlString(String tableName) {
+ String sql = MessageFormat.format(jdbcSqlQueryFormat.getListResourceSql(), META_TABLE_KEY, tableName,
+ META_TABLE_KEY);
+ return sql;
+ }
+
+ private String getAllResourceSqlString(String tableName) {
+ String sql = MessageFormat.format(jdbcSqlQueryFormat.getAllResourceSql(), getSelectList(true, true), tableName,
+ META_TABLE_KEY, META_TABLE_TS, META_TABLE_TS);
+ return sql;
+ }
+
+ private String getReplaceSql(String tableName) {
+ String sql = MessageFormat.format(jdbcSqlQueryFormat.getReplaceSql(), tableName, META_TABLE_TS,
+ META_TABLE_CONTENT, META_TABLE_KEY);
+ return sql;
+ }
+
+ private String getInsertSql(String tableName) {
+ String sql = MessageFormat.format(jdbcSqlQueryFormat.getInsertSql(), tableName, META_TABLE_KEY, META_TABLE_TS,
+ META_TABLE_CONTENT);
+ return sql;
+ }
+
+ @SuppressWarnings("unused")
+ private String getReplaceSqlWithoutContent(String tableName) {
+ String sql = MessageFormat.format(jdbcSqlQueryFormat.getReplaceSqlWithoutContent(), tableName, META_TABLE_TS,
+ META_TABLE_KEY);
+ return sql;
+ }
+
+ private String getInsertSqlWithoutContent(String tableName) {
+ String sql = MessageFormat.format(jdbcSqlQueryFormat.getInsertSqlWithoutContent(), tableName, META_TABLE_KEY,
+ META_TABLE_TS);
+ return sql;
+ }
+
+ private String getUpdateSqlWithoutContent(String tableName) {
+ String sql = MessageFormat.format(jdbcSqlQueryFormat.getUpdateSqlWithoutContent(), tableName, META_TABLE_TS,
+ META_TABLE_KEY, META_TABLE_TS);
+ return sql;
+ }
+
+ private String getUpdateContentSql(String tableName) {
+ String sql = MessageFormat.format(jdbcSqlQueryFormat.getUpdateContentSql(), tableName, META_TABLE_CONTENT,
+ META_TABLE_KEY);
+ return sql;
+ }
+
+ private String getSelectList(boolean fetchContent, boolean fetchTimestamp) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(META_TABLE_KEY);
+ if (fetchTimestamp)
+ sb.append("," + META_TABLE_TS);
+ if (fetchContent)
+ sb.append("," + META_TABLE_CONTENT);
+ return sb.toString();
+ }
+
+ private InputStream getInputStream(String resPath, ResultSet rs) throws SQLException, IOException {
+ if (rs == null) {
+ return null;
+ }
+ InputStream inputStream = rs.getBlob(META_TABLE_CONTENT) == null ? null
+ : rs.getBlob(META_TABLE_CONTENT).getBinaryStream();
+ if (inputStream != null) {
+ return inputStream;
+ } else {
+ Path redirectPath = bigCellHDFSPath(resPath);
+ FileSystem fileSystem = HadoopUtil.getFileSystem(redirectPath);
+ return fileSystem.open(redirectPath);
+ }
+ }
+
+ private Path writeLargeCellToHdfs(String resPath, byte[] largeColumn) throws SQLException {
+
+ boolean isResourceExist;
+ FSDataOutputStream out = null;
+ Path redirectPath = bigCellHDFSPath(resPath);
+ Path oldPath = new Path(redirectPath.toString() + "_old");
+ FileSystem fileSystem = null;
+ try {
+ fileSystem = HadoopUtil.getFileSystem(redirectPath);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ try {
+ isResourceExist = fileSystem.exists(redirectPath);
+ if (isResourceExist) {
+ FileUtil.copy(fileSystem, redirectPath, fileSystem, oldPath, false,
+ HadoopUtil.getCurrentConfiguration());
+ fileSystem.delete(redirectPath, true);
+ logger.debug("a copy of hdfs file {} is made", redirectPath);
+ }
+ out = fileSystem.create(redirectPath);
+ out.write(largeColumn);
+ return redirectPath;
+ } catch (Throwable e) {
+ try {
+ rollbackLargeCellFromHdfs(resPath);
+ } catch (Throwable ex) {
+ logger.error("fail to roll back resource " + resPath + " in hdfs", ex);
+ }
+ throw new SQLException(e);
+ } finally {
+ IOUtils.closeQuietly(out);
+ }
+ }
+
+ public void rollbackLargeCellFromHdfs(String resPath) throws SQLException {
+ Path redirectPath = bigCellHDFSPath(resPath);
+ Path oldPath = new Path(redirectPath.toString() + "_old");
+ FileSystem fileSystem = null;
+ try {
+ fileSystem = HadoopUtil.getFileSystem(redirectPath);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ try {
+ if (fileSystem.exists(oldPath)) {
+ FileUtil.copy(fileSystem, oldPath, fileSystem, redirectPath, true, true,
+ HadoopUtil.getCurrentConfiguration());
+ logger.info("roll back hdfs file {}", resPath);
+ } else {
+ fileSystem.delete(redirectPath, true);
+ logger.warn("no backup for hdfs file {} is found, clean it", resPath);
+ }
+ } catch (Throwable e) {
+
+ try {
+ //last try to delete redirectPath, because we prefer a deleted rather than incomplete
+ fileSystem.delete(redirectPath, true);
+ } catch (Throwable ignore) {
+ // ignore it
+ }
+
+ throw new SQLException(e);
+ }
+ }
+
+ private void cleanOldLargeCellFromHdfs(String resPath) throws SQLException {
+ Path redirectPath = bigCellHDFSPath(resPath);
+ Path oldPath = new Path(redirectPath.toString() + "_old");
+ FileSystem fileSystem = null;
+ try {
+ fileSystem = HadoopUtil.getFileSystem(redirectPath);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ try {
+ if (fileSystem.exists(oldPath)) {
+ fileSystem.delete(oldPath, true);
+ }
+ } catch (Throwable e) {
+ logger.warn("error cleaning the backup file for " + redirectPath + ", leave it as garbage", e);
+ }
+ }
+
+ public Path bigCellHDFSPath(String resPath) {
+ String metastoreBigCellHdfsDirectory = this.kylinConfig.getMetastoreBigCellHdfsDirectory();
+ Path redirectPath = new Path(metastoreBigCellHdfsDirectory, "resources-jdbc" + resPath);
+ return redirectPath;
+ }
+
+ public long getQueriedSqlNum() {
+ return queriedSqlNum;
+ }
+
+ public String getMetaTableName(String resPath) {
+ if (resPath.startsWith(ResourceStore.BAD_QUERY_RESOURCE_ROOT) || resPath.startsWith(ResourceStore.CUBE_STATISTICS_ROOT)
+ || resPath.startsWith(ResourceStore.DICT_RESOURCE_ROOT) || resPath.startsWith(ResourceStore.EXECUTE_RESOURCE_ROOT)
+ || resPath.startsWith(ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT) || resPath.startsWith(ResourceStore.EXT_SNAPSHOT_RESOURCE_ROOT)
+ || resPath.startsWith(ResourceStore.TEMP_STATMENT_RESOURCE_ROOT)) {
+ return tablesName[1];
+ } else {
+ return tablesName[0];
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceStore.java b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceStore.java
new file mode 100644
index 0000000..b62b33f
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceStore.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.persistence;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.commons.io.IOUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.StorageURL;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.NavigableSet;
+import java.util.TreeSet;
+
+public class JDBCResourceStore extends ResourceStore {
+
+ private static final String JDBC_SCHEME = "jdbc";
+
+ private String[] tablesName = new String[2];
+
+ private JDBCResourceDAO resourceDAO;
+
+ public JDBCResourceStore(KylinConfig kylinConfig) throws SQLException {
+ super(kylinConfig);
+ StorageURL metadataUrl = kylinConfig.getMetadataUrl();
+ checkScheme(metadataUrl);
+ tablesName[0] = metadataUrl.getIdentifier();
+ tablesName[1] = metadataUrl.getIdentifier() + "1";
+ this.resourceDAO = new JDBCResourceDAO(kylinConfig, tablesName);
+ }
+
+ @Override
+ protected boolean existsImpl(String resPath) throws IOException {
+ try {
+ return resourceDAO.existResource(resPath);
+ } catch (SQLException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ protected RawResource getResourceImpl(String resPath) throws IOException {
+ return getResourceImpl(resPath, false);
+ }
+
+ protected RawResource getResourceImpl(String resPath, final boolean isAllowBroken) throws IOException {
+ try {
+ JDBCResource resource = resourceDAO.getResource(resPath, true, true, isAllowBroken);
+ if (resource != null)
+ return new RawResource(resource.getContent(), resource.getTimestamp());
+ else
+ return null;
+ } catch (SQLException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ protected long getResourceTimestampImpl(String resPath) throws IOException {
+ try {
+ JDBCResource resource = resourceDAO.getResource(resPath, false, true);
+ if (resource != null) {
+ return resource.getTimestamp();
+ } else {
+ return 0L;
+ }
+ } catch (SQLException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ protected NavigableSet<String> listResourcesImpl(String folderPath, boolean recursive) throws IOException {
+ try {
+ final TreeSet<String> result = resourceDAO.listAllResource(makeFolderPath(folderPath), recursive);
+ return result.isEmpty() ? null : result;
+ } catch (SQLException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ protected List<RawResource> getAllResourcesImpl(String folderPath, long timeStart, long timeEndExclusive)
+ throws IOException {
+ return getAllResourcesImpl(folderPath, timeStart, timeEndExclusive, false);
+ }
+
+ @Override
+ protected List<RawResource> getAllResourcesImpl(String folderPath, long timeStart, long timeEndExclusive,
+ final boolean isAllowBroken) throws IOException {
+ final List<RawResource> result = Lists.newArrayList();
+ try {
+ List<JDBCResource> allResource = resourceDAO.getAllResource(makeFolderPath(folderPath), timeStart,
+ timeEndExclusive, isAllowBroken);
+ for (JDBCResource resource : allResource) {
+ result.add(new RawResource(resource.getContent(), resource.getTimestamp()));
+ }
+ return result;
+ } catch (SQLException e) {
+ for (RawResource rawResource : result) {
+ IOUtils.closeQuietly(rawResource.inputStream);
+ }
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ protected void putResourceImpl(String resPath, InputStream content, long ts) throws IOException {
+ try {
+ JDBCResource resource = new JDBCResource(resPath, ts, content);
+ resourceDAO.putResource(resource);
+ } catch (SQLException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ protected long checkAndPutResourceImpl(String resPath, byte[] content, long oldTS, long newTS)
+ throws IOException, WriteConflictException {
+ try {
+ resourceDAO.checkAndPutResource(resPath, content, oldTS, newTS);
+ return newTS;
+ } catch (SQLException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ protected void deleteResourceImpl(String resPath) throws IOException {
+ try {
+ resourceDAO.deleteResource(resPath);
+ } catch (SQLException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ protected String getReadableResourcePathImpl(String resPath) {
+ return tablesName + "(key='" + resPath + "')@" + kylinConfig.getMetadataUrl();
+ }
+
+ private String makeFolderPath(String folderPath) {
+ Preconditions.checkState(folderPath.startsWith("/"));
+ String lookForPrefix = folderPath.endsWith("/") ? folderPath : folderPath + "/";
+ return lookForPrefix;
+ }
+
+ protected JDBCResourceDAO getResourceDAO() {
+ return resourceDAO;
+ }
+
+ public long getQueriedSqlNum() {
+ return resourceDAO.getQueriedSqlNum();
+ }
+
+ public static void checkScheme(StorageURL url) {
+ Preconditions.checkState(JDBC_SCHEME.equals(url.getScheme()));
+ }
+}
\ No newline at end of file
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCSqlQueryFormat.java b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCSqlQueryFormat.java
new file mode 100644
index 0000000..d3d70c3
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCSqlQueryFormat.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.persistence;
+
+import java.util.Properties;
+
+public class JDBCSqlQueryFormat {
+ private Properties sqlQueries;
+
+ public JDBCSqlQueryFormat(Properties props) {
+ this.sqlQueries = props;
+ }
+
+ private String getSqlFromProperties(String key) {
+ String sql = sqlQueries.getProperty(key);
+ if (sql == null)
+ throw new RuntimeException(String.format("Property '%s' not found", key));
+ return sql;
+ }
+
+ public String getCreateIfNeedSql() {
+ return getSqlFromProperties("format.sql.create-if-need");
+ }
+
+ public String getKeyEqualsSql() {
+ return getSqlFromProperties("format.sql.key-equals");
+ }
+
+ public String getDeletePstatSql() {
+ return getSqlFromProperties("format.sql.delete-pstat");
+ }
+
+ public String getListResourceSql() {
+ return getSqlFromProperties("format.sql.list-resource");
+ }
+
+ public String getAllResourceSql() {
+ return getSqlFromProperties("format.sql.all-resource");
+ }
+
+ public String getReplaceSql() {
+ return getSqlFromProperties("format.sql.replace");
+ }
+
+ public String getInsertSql() {
+ return getSqlFromProperties("format.sql.insert");
+ }
+
+ public String getReplaceSqlWithoutContent() {
+ return getSqlFromProperties("format.sql.replace-without-content");
+ }
+
+ public String getInsertSqlWithoutContent() {
+ return getSqlFromProperties("format.sql.insert-without-content");
+ }
+
+ public String getUpdateSqlWithoutContent() {
+ return getSqlFromProperties("format.sql.update-without-content");
+ }
+
+ public String getUpdateContentSql() {
+ return getSqlFromProperties("format.sql.update-content");
+ }
+
+ public String getTestCreateSql() {
+ return getSqlFromProperties("format.sql.test.create");
+ }
+
+ public String getTestDropSql() {
+ return getSqlFromProperties("format.sql.test.drop");
+ }
+
+ public String getCreateIndexSql() {
+ return getSqlFromProperties("format.sql.create-index");
+ }
+
+ public String getCheckTableExistsSql() {
+ return getSqlFromProperties("format.sql.check-table-exists");
+ }
+}
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCSqlQueryFormatProvider.java b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCSqlQueryFormatProvider.java
new file mode 100644
index 0000000..bcbc79c
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCSqlQueryFormatProvider.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.persistence;
+
+import org.apache.commons.io.IOUtils;
+
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+public class JDBCSqlQueryFormatProvider {
+ static Map<String, Properties> cache = new HashMap<>();
+
+ public static JDBCSqlQueryFormat createJDBCSqlQueriesFormat(String dialect) {
+ String key = String.format("/metadata-jdbc-%s.properties", dialect.toLowerCase());
+ if (cache.containsKey(key)) {
+ return new JDBCSqlQueryFormat(cache.get(key));
+ } else {
+ Properties props = new Properties();
+ InputStream input = null;
+ try {
+ input = props.getClass().getResourceAsStream(key);
+ props.load(input);
+ if (!props.isEmpty()) {
+ cache.put(key, props);
+ }
+ return new JDBCSqlQueryFormat(props);
+ } catch (Exception e) {
+ throw new RuntimeException(String.format("Can't find properties named %s for metastore", key), e);
+ } finally {
+ IOUtils.closeQuietly(input);
+ }
+ }
+
+ }
+}
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
index 36bb595..1262680 100644
--- a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
@@ -231,6 +231,11 @@ abstract public class ResourceStore {
*/
abstract protected List<RawResource> getAllResourcesImpl(String folderPath, long timeStart, long timeEndExclusive) throws IOException;
+ protected List<RawResource> getAllResourcesImpl(String folderPath, long timeStart, long timeEndExclusive,
+ boolean isAllowBroken) throws IOException {
+ return getAllResourcesImpl(folderPath, timeStart, timeEndExclusive);
+ }
+
/**
* returns null if not exists
*/
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java b/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
index 3aef34a..7bc7387 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
@@ -84,6 +84,16 @@ public class HadoopUtil {
return getFileSystem(workingPath, conf);
}
+ public static FileSystem getReadFileSystem() throws IOException {
+ Configuration conf = getCurrentConfiguration();
+ return getReadFileSystem(conf);
+ }
+
+ public static FileSystem getReadFileSystem(Configuration conf) throws IOException {
+ Path parquetReadPath = new Path(KylinConfig.getInstanceFromEnv().getReadHdfsWorkingDirectory(null));
+ return getFileSystem(parquetReadPath, conf);
+ }
+
public static FileSystem getFileSystem(String path) throws IOException {
return getFileSystem(new Path(makeURI(path)));
}
diff --git a/core-common/src/main/resources/metadata-jdbc-mysql.properties b/core-common/src/main/resources/metadata-jdbc-mysql.properties
new file mode 100644
index 0000000..7be6a1b
--- /dev/null
+++ b/core-common/src/main/resources/metadata-jdbc-mysql.properties
@@ -0,0 +1,34 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+###JDBC METASTORE
+
+format.sql.create-if-need=create table if not exists {0} ( {1} VARCHAR(255) primary key, {2} BIGINT, {3} LONGBLOB )
+format.sql.key-equals=select {0} from {1} where {2} = ?
+format.sql.delete-pstat=delete from {0} where {1} = ?
+format.sql.list-resource=select {0} from {1} where {2} like ?
+format.sql.all-resource=select {0} from {1} where {2} like ? and {3} >= ? and {4} < ?
+format.sql.replace=update {0} set {1} = ?,{2} = ? where {3} = ?
+format.sql.insert=replace into {0}({1},{2},{3}) values(?,?,?)
+format.sql.replace-without-content=update {0} set {1} = ? where {2} = ?
+format.sql.insert-without-content=replace into {0}({1},{2}) values(?,?)
+format.sql.update-without-content=update {0} set {1}=? where {2}=? and {3}=?
+format.sql.update-content=update {0} set {1}=? where {2}=?
+format.sql.test.create=create table if not exists {0} (name VARCHAR(255) primary key, id BIGINT)
+format.sql.test.drop=drop table if exists {0}
+format.sql.create-index=create index {0} on {1} ({2})
+format.sql.check-table-exists=show tables
\ No newline at end of file
diff --git a/kylin-it/pom.xml b/kylin-it/pom.xml
index a3e7e68..8dc6532 100644
--- a/kylin-it/pom.xml
+++ b/kylin-it/pom.xml
@@ -131,6 +131,11 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>mysql</groupId>
+ <artifactId>mysql-connector-java</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>xerces</groupId>
<artifactId>xercesImpl</artifactId>
<scope>test</scope>
diff --git a/kylin-it/src/test/java/org/apache/kylin/storage/jdbc/ITJDBCResourceStoreTest.java b/kylin-it/src/test/java/org/apache/kylin/storage/jdbc/ITJDBCResourceStoreTest.java
new file mode 100644
index 0000000..e12ecb1
--- /dev/null
+++ b/kylin-it/src/test/java/org/apache/kylin/storage/jdbc/ITJDBCResourceStoreTest.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.jdbc;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.StorageURL;
+import org.apache.kylin.common.persistence.JDBCConnectionManager;
+import org.apache.kylin.common.persistence.JDBCResourceStore;
+import org.apache.kylin.common.persistence.JDBCSqlQueryFormat;
+import org.apache.kylin.common.persistence.JDBCSqlQueryFormatProvider;
+import org.apache.kylin.common.persistence.RawResource;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.persistence.ResourceStoreTest;
+import org.apache.kylin.common.persistence.ResourceTool;
+import org.apache.kylin.common.persistence.RootPersistentEntity;
+import org.apache.kylin.common.persistence.Serializer;
+import org.apache.kylin.common.persistence.StringEntity;
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.apache.log4j.component.helpers.MessageFormatter;
+import org.junit.After;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.Statement;
+import java.text.MessageFormat;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.UUID;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class ITJDBCResourceStoreTest extends HBaseMetadataTestCase {
+ private static final Logger logger = LoggerFactory.getLogger(ITJDBCResourceStoreTest.class);
+
+ private static final String LARGE_CELL_PATH = "/cube/_test_large_cell.json";
+ private static final String Large_Content = "THIS_IS_A_LARGE_CELL";
+ private KylinConfig kylinConfig;
+ private JDBCConnectionManager connectionManager;
+ private final String jdbcMetadataUrlNoIdentifier = "@jdbc,url=jdbc:mysql://localhost:3306/kylin_it,username=root,password=,maxActive=10,maxIdle=10";
+ private final String mainIdentifier = "kylin_default_instance";
+ private final String copyIdentifier = "kylin_default_instance_copy";
+ private StorageURL metadataUrlBackup;
+ private boolean jdbcConnectable = false;
+
+ @Before
+ public void setup() throws Exception {
+ this.createTestMetadata();
+ kylinConfig = KylinConfig.getInstanceFromEnv();
+ KylinConfig configBackup = KylinConfig.createKylinConfig(kylinConfig);
+ Statement statement = null;
+ Connection conn = null;
+ metadataUrlBackup = kylinConfig.getMetadataUrl();
+ kylinConfig.setMetadataUrl(mainIdentifier + jdbcMetadataUrlNoIdentifier);
+ JDBCSqlQueryFormat sqlQueryFormat = JDBCSqlQueryFormatProvider
+ .createJDBCSqlQueriesFormat(KylinConfig.getInstanceFromEnv().getMetadataDialect());
+ try {
+ connectionManager = JDBCConnectionManager.getConnectionManager();
+ conn = connectionManager.getConn();
+ statement = conn.createStatement();
+ String sql = MessageFormat.format(sqlQueryFormat.getTestDropSql(), mainIdentifier);
+ statement.executeUpdate(sql);
+ sql = MessageFormat.format(sqlQueryFormat.getTestDropSql(), copyIdentifier);
+ statement.executeUpdate(sql);
+ jdbcConnectable = true;
+ ResourceTool.copy(configBackup, kylinConfig);
+ } catch (RuntimeException ex) {
+ logger.info("Init connection manager failed, skip test cases");
+ } finally {
+ JDBCConnectionManager.closeQuietly(statement);
+ JDBCConnectionManager.closeQuietly(conn);
+ }
+ }
+
+ @After
+ public void after() throws Exception {
+ this.cleanupTestMetadata();
+ kylinConfig.setMetadataUrl(metadataUrlBackup.toString());
+ }
+
+ @Test
+ public void testConnectJDBC() throws Exception {
+ Assume.assumeTrue(jdbcConnectable);
+ Connection conn = null;
+ try {
+ conn = connectionManager.getConn();
+ assertNotNull(conn);
+ } finally {
+ JDBCConnectionManager.closeQuietly(conn);
+ }
+ }
+
+ @Test
+ public void testJdbcBasicFunction() throws Exception {
+ Assume.assumeTrue(jdbcConnectable);
+ Connection conn = null;
+ Statement statement = null;
+ String createTableSql = "CREATE TABLE test(col1 VARCHAR (10), col2 INTEGER )";
+ String dropTableSql = "DROP TABLE IF EXISTS test";
+ try {
+ conn = connectionManager.getConn();
+ statement = conn.createStatement();
+ statement.executeUpdate(dropTableSql);
+ statement.executeUpdate(createTableSql);
+ statement.executeUpdate(dropTableSql);
+ } finally {
+ JDBCConnectionManager.closeQuietly(statement);
+ JDBCConnectionManager.closeQuietly(conn);
+ }
+ }
+
+ // Support other db except mysql
+ // @Test
+ // public void testGetDbcpProperties() {
+ // Properties prop = JDBCConnectionManager.getConnectionManager().getDbcpProperties();
+ // assertEquals("com.mysql.jdbc.Driver", prop.get("driverClassName"));
+ // }
+
+ @Test
+ public void testMsgFormatter() {
+ System.out.println(MessageFormatter.format("{}:{}", "a", "b"));
+ }
+
+ @Test
+ public void testResourceStoreBasic() throws Exception {
+ Assume.assumeTrue(jdbcConnectable);
+ ResourceStoreTest.testAStore(
+ ResourceStoreTest.mockUrl(
+ StringUtils.substringAfterLast(mainIdentifier + jdbcMetadataUrlNoIdentifier, "@"), kylinConfig),
+ kylinConfig);
+ }
+
+ @Test
+ public void testJDBCStoreWithLargeCell() throws Exception {
+ Assume.assumeTrue(jdbcConnectable);
+ JDBCResourceStore store = null;
+ StringEntity content = new StringEntity(Large_Content);
+ String largePath = "/large/large.json";
+ try {
+ String oldUrl = ResourceStoreTest.replaceMetadataUrl(kylinConfig,
+ ResourceStoreTest.mockUrl("jdbc", kylinConfig));
+ store = new JDBCResourceStore(KylinConfig.getInstanceFromEnv());
+ store.deleteResource(largePath);
+ store.putResource(largePath, content, StringEntity.serializer);
+ assertTrue(store.exists(largePath));
+ StringEntity t = store.getResource(largePath, StringEntity.class, StringEntity.serializer);
+ assertEquals(content, t);
+ store.deleteResource(LARGE_CELL_PATH);
+ ResourceStoreTest.replaceMetadataUrl(kylinConfig, oldUrl);
+ } finally {
+ if (store != null)
+ store.deleteResource(LARGE_CELL_PATH);
+ }
+ }
+
+ @Test
+ public void testPerformance() throws Exception {
+ Assume.assumeTrue(jdbcConnectable);
+ ResourceStoreTest.testPerformance(ResourceStoreTest.mockUrl("jdbc", kylinConfig), kylinConfig);
+ ResourceStoreTest.testPerformance(ResourceStoreTest.mockUrl("hbase", kylinConfig), kylinConfig);
+ }
+
+ @Test
+ public void testMaxCell() throws Exception {
+ Assume.assumeTrue(jdbcConnectable);
+ byte[] data = new byte[500 * 1024];
+ for (int i = 0; i < data.length; i++) {
+ data[i] = (byte) 0;
+ }
+ JDBCResourceStore store = null;
+ ByteEntity content = new ByteEntity(data);
+ try {
+ String oldUrl = ResourceStoreTest.replaceMetadataUrl(kylinConfig,
+ ResourceStoreTest.mockUrl("jdbc", kylinConfig));
+ store = new JDBCResourceStore(KylinConfig.getInstanceFromEnv());
+ store.deleteResource(LARGE_CELL_PATH);
+ store.putResource(LARGE_CELL_PATH, content, ByteEntity.serializer);
+ assertTrue(store.exists(LARGE_CELL_PATH));
+ ByteEntity t = store.getResource(LARGE_CELL_PATH, ByteEntity.class, ByteEntity.serializer);
+ assertEquals(content, t);
+ store.deleteResource(LARGE_CELL_PATH);
+ ResourceStoreTest.replaceMetadataUrl(kylinConfig, oldUrl);
+ } finally {
+ if (store != null)
+ store.deleteResource(LARGE_CELL_PATH);
+ }
+ }
+
+ @Test
+ public void testPerformanceWithResourceTool() throws Exception {
+ Assume.assumeTrue(jdbcConnectable);
+ KylinConfig tmpConfig = KylinConfig.createKylinConfig(KylinConfig.getInstanceFromEnv());
+ tmpConfig.setMetadataUrl(copyIdentifier + jdbcMetadataUrlNoIdentifier);
+
+ JDBCResourceStore store = (JDBCResourceStore) ResourceStore.getStore(kylinConfig);
+ NavigableSet<String> executes = store.listResources(ResourceStore.EXECUTE_RESOURCE_ROOT);
+ NavigableSet<String> executeOutputs = store.listResources(ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT);
+
+ long startTs = System.currentTimeMillis();
+
+ for (String execute : executes) {
+ String uuid = StringUtils.substringAfterLast(execute, "/");
+ RawResource executeResource = store.getResource(execute);
+ Map<String, RawResource> executeOutputResourceMap = new HashMap<>();
+
+ for (String executeOutput : executeOutputs) {
+ if (executeOutput.contains(uuid)) {
+ RawResource executeOutputResource = store.getResource(executeOutput);
+ executeOutputResourceMap.put(executeOutput, executeOutputResource);
+ }
+ }
+
+ for (int i = 0; i < 200; i++) {
+ String newUuid = UUID.randomUUID().toString();
+ store.putResource(ResourceStore.EXECUTE_RESOURCE_ROOT + "/" + newUuid, executeResource.inputStream,
+ System.currentTimeMillis());
+
+ for (String key : executeOutputResourceMap.keySet()) {
+ String step = StringUtils.substringAfterLast(key, uuid);
+ store.putResource(ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT + "/" + newUuid + step,
+ executeOutputResourceMap.get(key).inputStream, System.currentTimeMillis());
+ }
+ }
+ }
+
+ long queryNumBeforeCopy = store.getQueriedSqlNum();
+ ResourceTool.copy(kylinConfig, tmpConfig);
+ long endTs = System.currentTimeMillis();
+ long queryNumAfterCopy = store.getQueriedSqlNum();
+ JDBCResourceStore resourceStoreCopy = (JDBCResourceStore) ResourceStore.getStore(tmpConfig);
+
+ int executeNum = store.listResources("/execute").size();
+ int executeOutputNum = store.listResources("/execute_output").size();
+
+ assertEquals(executeNum, resourceStoreCopy.listResources("/execute").size());
+ assertEquals(executeOutputNum, resourceStoreCopy.listResources("/execute_output").size());
+
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ String startTime = sdf.format(new Date(Long.parseLong(String.valueOf(startTs))));
+ String endTime = sdf.format(new Date(Long.parseLong(String.valueOf(endTs))));
+
+ logger.info("Test performance with ResourceTool done during " + startTime + " to " + endTime);
+ logger.info("Now there is " + executeNum + " execute data and " + executeOutputNum
+ + " execute_output data in resource store.");
+ logger.info("Resource store run " + queryNumBeforeCopy + " sqls for metadata generation, and "
+ + (queryNumAfterCopy - queryNumBeforeCopy) + " sqls for copy with ResourceTool.");
+ assertTrue((queryNumAfterCopy - queryNumBeforeCopy) < queryNumBeforeCopy);
+ logger.info("This test is expected to be done in 10 mins.");
+ assertTrue((endTs - startTs) < 600000);
+ }
+
+ @SuppressWarnings("serial")
+ public static class ByteEntity extends RootPersistentEntity {
+
+ public static final Serializer<ByteEntity> serializer = new Serializer<ByteEntity>() {
+
+ @Override
+ public void serialize(ByteEntity obj, DataOutputStream out) throws IOException {
+ byte[] data = obj.getData();
+ out.writeInt(data.length);
+ out.write(data);
+ }
+
+ @Override
+ public ByteEntity deserialize(DataInputStream in) throws IOException {
+ int length = in.readInt();
+ byte[] bytes = new byte[length];
+ in.read(bytes);
+ return new ByteEntity(bytes);
+ }
+ };
+ byte[] data;
+
+ public ByteEntity() {
+
+ }
+
+ public ByteEntity(byte[] data) {
+ this.data = data;
+ }
+
+ public static Serializer<ByteEntity> getSerializer() {
+ return serializer;
+ }
+
+ public byte[] getData() {
+ return data;
+ }
+
+ public void setData(byte[] data) {
+ this.data = data;
+ }
+ }
+}
diff --git a/pom.xml b/pom.xml
index cd18659..0d38f9f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -62,6 +62,9 @@
<spark.version>2.1.2</spark.version>
<kryo.version>4.0.0</kryo.version>
+ <!-- mysql versions -->
+ <mysql-connector.version>5.1.8</mysql-connector.version>
+
<!-- Scala versions -->
<scala.version>2.11.0</scala.version>
@@ -550,6 +553,13 @@
<version>${hbase-hadoop2.version}</version>
<scope>test</scope>
</dependency>
+ <!-- jdbc dependencies -->
+ <dependency>
+ <groupId>mysql</groupId>
+ <artifactId>mysql-connector-java</artifactId>
+ <version>${mysql-connector.version}</version>
+ <scope>provided</scope>
+ </dependency>
<!-- Hive dependencies -->
<dependency>
<groupId>org.apache.hive</groupId>