You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/09/03 10:00:45 UTC
[kylin] 03/03: KYLIN-3488 code review
This is an automated email from the ASF dual-hosted git repository.
shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 101758d121cf7d192cbcfdb066deee5fb984de5d
Author: shaofengshi <sh...@apache.org>
AuthorDate: Sun Sep 2 21:58:08 2018 +0800
KYLIN-3488 code review
---
.../java/org/apache/kylin/common/KylinConfig.java | 22 ---
.../org/apache/kylin/common/KylinConfigBase.java | 107 ++++--------
.../kylin/common/persistence/JDBCResourceDAO.java | 190 +++++++++++----------
.../common/persistence/JDBCResourceStore.java | 24 +--
.../org/apache/kylin/common/util/HadoopUtil.java | 8 +-
.../storage/jdbc/ITJDBCResourceStoreTest.java | 13 +-
6 files changed, 152 insertions(+), 212 deletions(-)
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
index 468bef7..e09ce26 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
@@ -534,26 +534,4 @@ public class KylinConfig extends KylinConfigBase {
return this.base() == ((KylinConfig) another).base();
}
- public String getMetadataDialect() {
- return SYS_ENV_INSTANCE.getOptional("kylin.metadata.jdbc.dialect", "mysql");
- }
-
- public boolean isJsonAlwaysSmallCell() {
- return Boolean.valueOf(SYS_ENV_INSTANCE.getOptional("kylin.metadata.jdbc.json-always-small-cell", "true"));
- }
-
- public int getSmallCellMetadataWarningThreshold() {
- return Integer.parseInt(SYS_ENV_INSTANCE.getOptional("kylin.metadata.jdbc.small-cell-meta-size-warning-threshold",
- String.valueOf(100 << 20)));
- }
-
- public int getSmallCellMetadataErrorThreshold() {
- return Integer.parseInt(
- SYS_ENV_INSTANCE.getOptional("kylin.metadata.jdbc.small-cell-meta-size-error-threshold", String.valueOf(1 << 30)));
- }
-
- public int getJdbcResourceStoreMaxCellSize() {
- return Integer.parseInt(SYS_ENV_INSTANCE.getOptional("kylin.metadata.jdbc.max-cell-size", "262144")); //256k
- }
-
}
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 3300d83..0e9bd93 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -262,60 +262,14 @@ abstract public class KylinConfigBase implements Serializable {
return cachedHdfsWorkingDirectory;
}
- public String getMetastoreBigCellHdfsDirectory() {
-
- if (cachedBigCellDirectory != null)
- return cachedBigCellDirectory;
-
-
- String root = getOptional("kylin.env.hdfs-metastore-bigcell-dir");
-
- if (root == null) {
- return getJdbcHdfsWorkingDirectory();
- }
-
- Path path = new Path(root);
- if (!path.isAbsolute())
- throw new IllegalArgumentException(
- "kylin.env.hdfs-metastore-bigcell-dir must be absolute, but got " + root);
-
- // make sure path is qualified
- try {
- FileSystem fs = HadoopUtil.getReadFileSystem();
- path = fs.makeQualified(path);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
-
- root = new Path(path, StringUtils.replaceChars(getMetadataUrlPrefix(), ':', '-')).toString();
-
- if (!root.endsWith("/"))
- root += "/";
-
- cachedBigCellDirectory = root;
- if (cachedBigCellDirectory.startsWith("file:")) {
- cachedBigCellDirectory = cachedBigCellDirectory.replace("file:", "file://");
- } else if (cachedBigCellDirectory.startsWith("maprfs:")) {
- cachedBigCellDirectory = cachedBigCellDirectory.replace("maprfs:", "maprfs://");
- }
-
- return cachedBigCellDirectory;
- }
-
- private String getJdbcHdfsWorkingDirectory() {
- if (StringUtils.isNotEmpty(getJdbcFileSystem())) {
- Path workingDir = new Path(getReadHdfsWorkingDirectory());
- return new Path(getJdbcFileSystem(), Path.getPathWithoutSchemeAndAuthority(workingDir)).toString() + "/";
+ public String getReadHdfsWorkingDirectory() {
+ if (StringUtils.isNotEmpty(getHBaseClusterFs())) {
+ Path workingDir = new Path(getHdfsWorkingDirectory());
+ return new Path(getHBaseClusterFs(), Path.getPathWithoutSchemeAndAuthority(workingDir)).toString()
+ + "/";
}
- return getReadHdfsWorkingDirectory();
- }
-
- /**
- * Consider use kylin.env.hdfs-metastore-bigcell-dir instead of kylin.storage.columnar.jdbc.file-system
- */
- private String getJdbcFileSystem() {
- return getOptional("kylin.storage.columnar.jdbc.file-system", "");
+ return getHdfsWorkingDirectory();
}
public String getHdfsWorkingDirectory(String project) {
@@ -326,30 +280,6 @@ abstract public class KylinConfigBase implements Serializable {
}
}
- private String getReadHdfsWorkingDirectory() {
- if (StringUtils.isNotEmpty(getParquetReadFileSystem())) {
- Path workingDir = new Path(getHdfsWorkingDirectory());
- return new Path(getParquetReadFileSystem(), Path.getPathWithoutSchemeAndAuthority(workingDir)).toString()
- + "/";
- }
-
- return getHdfsWorkingDirectory();
- }
-
- public String getReadHdfsWorkingDirectory(String project) {
- if (StringUtils.isNotEmpty(getParquetReadFileSystem())) {
- Path workingDir = new Path(getHdfsWorkingDirectory(project));
- return new Path(getParquetReadFileSystem(), Path.getPathWithoutSchemeAndAuthority(workingDir)).toString()
- + "/";
- }
-
- return getHdfsWorkingDirectory(project);
- }
-
- public String getParquetReadFileSystem() {
- return getOptional("kylin.storage.columnar.file-system", "");
- }
-
public String getZookeeperBasePath() {
return getOptional("kylin.env.zookeeper-base-path", "/kylin");
}
@@ -1823,4 +1753,29 @@ abstract public class KylinConfigBase implements Serializable {
public String getAutoMigrateCubeDestConfig() {
return getOptional("kylin.tool.auto-migrate-cube.dest-config", "");
}
+
+ // ============================================================================
+ // jdbc metadata resource store
+ // ============================================================================
+
+ public String getMetadataDialect() {
+ return getOptional("kylin.metadata.jdbc.dialect", "mysql");
+ }
+
+ public boolean isJsonAlwaysSmallCell() {
+ return Boolean.valueOf(getOptional("kylin.metadata.jdbc.json-always-small-cell", "true"));
+ }
+
+ public int getSmallCellMetadataWarningThreshold() {
+ return Integer.parseInt(getOptional("kylin.metadata.jdbc.small-cell-meta-size-warning-threshold",
+ String.valueOf(100 << 20))); //100mb
+ }
+
+ public int getSmallCellMetadataErrorThreshold() {
+ return Integer.parseInt(getOptional("kylin.metadata.jdbc.small-cell-meta-size-error-threshold", String.valueOf(1 << 30))); // 1gb
+ }
+
+ public int getJdbcResourceStoreMaxCellSize() {
+ return Integer.parseInt(getOptional("kylin.metadata.jdbc.max-cell-size", "1048576")); // 1mb
+ }
}
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceDAO.java b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceDAO.java
index 03f255c..7ae61f0 100644
--- a/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceDAO.java
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceDAO.java
@@ -18,17 +18,6 @@
package org.apache.kylin.common.persistence;
-import com.google.common.collect.Lists;
-import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.Path;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.HadoopUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@@ -45,6 +34,18 @@ import java.util.List;
import java.util.Locale;
import java.util.TreeSet;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
public class JDBCResourceDAO {
private static Logger logger = LoggerFactory.getLogger(JDBCResourceDAO.class);
@@ -59,21 +60,30 @@ public class JDBCResourceDAO {
private JDBCSqlQueryFormat jdbcSqlQueryFormat;
- private String[] tablesName;
+ private String[] tableNames = new String[2];
private KylinConfig kylinConfig;
// For test
private long queriedSqlNum = 0;
- public JDBCResourceDAO(KylinConfig kylinConfig, String[] tablesName) throws SQLException {
+ private FileSystem redirectFileSystem;
+
+ public JDBCResourceDAO(KylinConfig kylinConfig, String metadataIdentifier) throws SQLException {
this.kylinConfig = kylinConfig;
this.connectionManager = JDBCConnectionManager.getConnectionManager();
- this.jdbcSqlQueryFormat = JDBCSqlQueryFormatProvider.createJDBCSqlQueriesFormat(kylinConfig.getMetadataDialect());
- this.tablesName = tablesName;
- for (int i = 0; i < tablesName.length; i++) {
- createTableIfNeeded(tablesName[i]);
- createIndex("IDX_" + META_TABLE_TS, tablesName[i], META_TABLE_TS);
+ this.jdbcSqlQueryFormat = JDBCSqlQueryFormatProvider
+ .createJDBCSqlQueriesFormat(kylinConfig.getMetadataDialect());
+ this.tableNames[0] = metadataIdentifier;
+ this.tableNames[1] = metadataIdentifier + "_log";
+ for (int i = 0; i < tableNames.length; i++) {
+ createTableIfNeeded(tableNames[i]);
+ createIndex("IDX_" + META_TABLE_TS, tableNames[i], META_TABLE_TS);
+ }
+ try {
+ redirectFileSystem = HadoopUtil.getReadFileSystem();
+ } catch (IOException e) {
+ throw new SQLException(e);
}
}
@@ -87,7 +97,7 @@ public class JDBCResourceDAO {
}
public JDBCResource getResource(final String resourcePath, final boolean fetchContent, final boolean fetchTimestamp,
- final boolean isAllowBroken) throws SQLException {
+ final boolean isAllowBroken) throws SQLException {
final JDBCResource resource = new JDBCResource();
logger.trace("getResource method. resourcePath : {} , fetchConetent : {} , fetch TS : {}", resourcePath,
fetchContent, fetchTimestamp);
@@ -162,7 +172,7 @@ public class JDBCResourceDAO {
}
public List<JDBCResource> getAllResource(final String folderPath, final long timeStart, final long timeEndExclusive,
- final boolean isAllowBroken) throws SQLException {
+ final boolean isAllowBroken) throws SQLException {
final List<JDBCResource> allResource = Lists.newArrayList();
executeSql(new SqlOperation() {
@Override
@@ -236,12 +246,9 @@ public class JDBCResourceDAO {
}
private void deleteHDFSResourceIfExist(String resourcePath) throws IOException {
-
Path redirectPath = bigCellHDFSPath(resourcePath);
- FileSystem fileSystem = HadoopUtil.getFileSystem(redirectPath);
-
- if (fileSystem.exists(redirectPath)) {
- fileSystem.delete(redirectPath, true);
+ if (redirectFileSystem.exists(redirectPath)) {
+ redirectFileSystem.delete(redirectPath, true);
}
}
@@ -480,79 +487,104 @@ public class JDBCResourceDAO {
}
private String getCheckTableExistsSql(final String tableName) {
- final String sql = new MessageFormat(jdbcSqlQueryFormat.getCheckTableExistsSql(), Locale.ROOT).format(tableName, new StringBuffer(), new FieldPosition(0)).toString();
+ final String sql = new MessageFormat(jdbcSqlQueryFormat.getCheckTableExistsSql(), Locale.ROOT)
+ .format(tableName, new StringBuffer(), new FieldPosition(0)).toString();
return sql;
}
//sql queries
private String getCreateIfNeededSql(String tableName) {
- String sql = new MessageFormat(jdbcSqlQueryFormat.getCreateIfNeedSql(), Locale.ROOT).format(new Object[]{tableName, META_TABLE_KEY,
- META_TABLE_TS, META_TABLE_CONTENT}, new StringBuffer(), new FieldPosition(0)).toString();
+ String sql = new MessageFormat(jdbcSqlQueryFormat.getCreateIfNeedSql(), Locale.ROOT)
+ .format(new Object[] { tableName, META_TABLE_KEY, META_TABLE_TS, META_TABLE_CONTENT },
+ new StringBuffer(), new FieldPosition(0))
+ .toString();
return sql;
}
//sql queries
private String getCreateIndexSql(String indexName, String tableName, String indexCol) {
- String sql = new MessageFormat(jdbcSqlQueryFormat.getCreateIndexSql(), Locale.ROOT).format(new Object[] {indexName, tableName, indexCol}, new StringBuffer(), new FieldPosition(0)).toString();
+ String sql = new MessageFormat(jdbcSqlQueryFormat.getCreateIndexSql(), Locale.ROOT)
+ .format(new Object[] { indexName, tableName, indexCol }, new StringBuffer(), new FieldPosition(0))
+ .toString();
return sql;
}
private String getKeyEqualSqlString(String tableName, boolean fetchContent, boolean fetchTimestamp) {
- String sql = new MessageFormat(jdbcSqlQueryFormat.getKeyEqualsSql(), Locale.ROOT).format(new Object[] {getSelectList(fetchContent, fetchTimestamp), tableName, META_TABLE_KEY}, new StringBuffer(), new FieldPosition(0)).toString();
+ String sql = new MessageFormat(jdbcSqlQueryFormat.getKeyEqualsSql(), Locale.ROOT)
+ .format(new Object[] { getSelectList(fetchContent, fetchTimestamp), tableName, META_TABLE_KEY },
+ new StringBuffer(), new FieldPosition(0))
+ .toString();
return sql;
}
private String getDeletePstatSql(String tableName) {
- String sql = new MessageFormat(jdbcSqlQueryFormat.getDeletePstatSql(), Locale.ROOT).format(new Object[] {tableName, META_TABLE_KEY}, new StringBuffer(), new FieldPosition(0)).toString();
+ String sql = new MessageFormat(jdbcSqlQueryFormat.getDeletePstatSql(), Locale.ROOT)
+ .format(new Object[] { tableName, META_TABLE_KEY }, new StringBuffer(), new FieldPosition(0))
+ .toString();
return sql;
}
private String getListResourceSqlString(String tableName) {
- String sql = new MessageFormat(jdbcSqlQueryFormat.getListResourceSql(), Locale.ROOT).format(new Object[] {META_TABLE_KEY, tableName,
- META_TABLE_KEY}, new StringBuffer(), new FieldPosition(0)).toString();
+ String sql = new MessageFormat(jdbcSqlQueryFormat.getListResourceSql(), Locale.ROOT)
+ .format(new Object[] { META_TABLE_KEY, tableName, META_TABLE_KEY }, new StringBuffer(),
+ new FieldPosition(0))
+ .toString();
return sql;
}
private String getAllResourceSqlString(String tableName) {
- String sql = new MessageFormat(jdbcSqlQueryFormat.getAllResourceSql(), Locale.ROOT).format(new Object[] {getSelectList(true, true), tableName,
- META_TABLE_KEY, META_TABLE_TS, META_TABLE_TS}, new StringBuffer(), new FieldPosition(0)).toString();
+ String sql = new MessageFormat(jdbcSqlQueryFormat.getAllResourceSql(), Locale.ROOT).format(
+ new Object[] { getSelectList(true, true), tableName, META_TABLE_KEY, META_TABLE_TS, META_TABLE_TS },
+ new StringBuffer(), new FieldPosition(0)).toString();
return sql;
}
private String getReplaceSql(String tableName) {
- String sql = new MessageFormat(jdbcSqlQueryFormat.getReplaceSql(), Locale.ROOT).format(new Object[] {tableName, META_TABLE_TS,
- META_TABLE_CONTENT, META_TABLE_KEY}, new StringBuffer(), new FieldPosition(0)).toString();
+ String sql = new MessageFormat(jdbcSqlQueryFormat.getReplaceSql(), Locale.ROOT)
+ .format(new Object[] { tableName, META_TABLE_TS, META_TABLE_CONTENT, META_TABLE_KEY },
+ new StringBuffer(), new FieldPosition(0))
+ .toString();
return sql;
}
private String getInsertSql(String tableName) {
- String sql = new MessageFormat(jdbcSqlQueryFormat.getInsertSql(), Locale.ROOT).format(new Object[] {tableName, META_TABLE_KEY, META_TABLE_TS,
- META_TABLE_CONTENT}, new StringBuffer(), new FieldPosition(0)).toString();
+ String sql = new MessageFormat(jdbcSqlQueryFormat.getInsertSql(), Locale.ROOT)
+ .format(new Object[] { tableName, META_TABLE_KEY, META_TABLE_TS, META_TABLE_CONTENT },
+ new StringBuffer(), new FieldPosition(0))
+ .toString();
return sql;
}
@SuppressWarnings("unused")
private String getReplaceSqlWithoutContent(String tableName) {
- String sql = new MessageFormat(jdbcSqlQueryFormat.getReplaceSqlWithoutContent(), Locale.ROOT).format(new Object[] {tableName, META_TABLE_TS,
- META_TABLE_KEY}, new StringBuffer(), new FieldPosition(0)).toString();
+ String sql = new MessageFormat(jdbcSqlQueryFormat.getReplaceSqlWithoutContent(), Locale.ROOT)
+ .format(new Object[] { tableName, META_TABLE_TS, META_TABLE_KEY }, new StringBuffer(),
+ new FieldPosition(0))
+ .toString();
return sql;
}
private String getInsertSqlWithoutContent(String tableName) {
- String sql = new MessageFormat(jdbcSqlQueryFormat.getInsertSqlWithoutContent(), Locale.ROOT).format(new Object[] {tableName, META_TABLE_KEY,
- META_TABLE_TS}, new StringBuffer(), new FieldPosition(0)).toString();
+ String sql = new MessageFormat(jdbcSqlQueryFormat.getInsertSqlWithoutContent(), Locale.ROOT)
+ .format(new Object[] { tableName, META_TABLE_KEY, META_TABLE_TS }, new StringBuffer(),
+ new FieldPosition(0))
+ .toString();
return sql;
}
private String getUpdateSqlWithoutContent(String tableName) {
- String sql = new MessageFormat(jdbcSqlQueryFormat.getUpdateSqlWithoutContent(), Locale.ROOT).format(new Object[] {tableName, META_TABLE_TS,
- META_TABLE_KEY, META_TABLE_TS}, new StringBuffer(), new FieldPosition(0)).toString();
+ String sql = new MessageFormat(jdbcSqlQueryFormat.getUpdateSqlWithoutContent(), Locale.ROOT)
+ .format(new Object[] { tableName, META_TABLE_TS, META_TABLE_KEY, META_TABLE_TS }, new StringBuffer(),
+ new FieldPosition(0))
+ .toString();
return sql;
}
private String getUpdateContentSql(String tableName) {
- String sql = new MessageFormat(jdbcSqlQueryFormat.getUpdateContentSql(), Locale.ROOT).format(new Object[] {tableName, META_TABLE_CONTENT,
- META_TABLE_KEY}, new StringBuffer(), new FieldPosition(0)).toString();
+ String sql = new MessageFormat(jdbcSqlQueryFormat.getUpdateContentSql(), Locale.ROOT)
+ .format(new Object[] { tableName, META_TABLE_CONTENT, META_TABLE_KEY }, new StringBuffer(),
+ new FieldPosition(0))
+ .toString();
return sql;
}
@@ -576,8 +608,7 @@ public class JDBCResourceDAO {
return inputStream;
} else {
Path redirectPath = bigCellHDFSPath(resPath);
- FileSystem fileSystem = HadoopUtil.getFileSystem(redirectPath);
- return fileSystem.open(redirectPath);
+ return redirectFileSystem.open(redirectPath);
}
}
@@ -587,21 +618,15 @@ public class JDBCResourceDAO {
FSDataOutputStream out = null;
Path redirectPath = bigCellHDFSPath(resPath);
Path oldPath = new Path(redirectPath.toString() + "_old");
- FileSystem fileSystem = null;
try {
- fileSystem = HadoopUtil.getFileSystem(redirectPath);
- } catch (IOException e) {
- e.printStackTrace();
- }
- try {
- isResourceExist = fileSystem.exists(redirectPath);
+ isResourceExist = redirectFileSystem.exists(redirectPath);
if (isResourceExist) {
- FileUtil.copy(fileSystem, redirectPath, fileSystem, oldPath, false,
+ FileUtil.copy(redirectFileSystem, redirectPath, redirectFileSystem, oldPath, false,
HadoopUtil.getCurrentConfiguration());
- fileSystem.delete(redirectPath, true);
+ redirectFileSystem.delete(redirectPath, true);
logger.debug("a copy of hdfs file {} is made", redirectPath);
}
- out = fileSystem.create(redirectPath);
+ out = redirectFileSystem.create(redirectPath);
out.write(largeColumn);
return redirectPath;
} catch (Throwable e) {
@@ -619,28 +644,22 @@ public class JDBCResourceDAO {
public void rollbackLargeCellFromHdfs(String resPath) throws SQLException {
Path redirectPath = bigCellHDFSPath(resPath);
Path oldPath = new Path(redirectPath.toString() + "_old");
- FileSystem fileSystem = null;
- try {
- fileSystem = HadoopUtil.getFileSystem(redirectPath);
- } catch (IOException e) {
- e.printStackTrace();
- }
try {
- if (fileSystem.exists(oldPath)) {
- FileUtil.copy(fileSystem, oldPath, fileSystem, redirectPath, true, true,
+ if (redirectFileSystem.exists(oldPath)) {
+ FileUtil.copy(redirectFileSystem, oldPath, redirectFileSystem, redirectPath, true, true,
HadoopUtil.getCurrentConfiguration());
logger.info("roll back hdfs file {}", resPath);
} else {
- fileSystem.delete(redirectPath, true);
+ redirectFileSystem.delete(redirectPath, true);
logger.warn("no backup for hdfs file {} is found, clean it", resPath);
}
} catch (Throwable e) {
try {
//last try to delete redirectPath, because we prefer a deleted rather than incomplete
- fileSystem.delete(redirectPath, true);
- } catch (Throwable ignore) {
- // ignore it
+ redirectFileSystem.delete(redirectPath, true);
+ } catch (Throwable ex) {
+ logger.error("fail to delete resource " + redirectPath + " in hdfs", ex);
}
throw new SQLException(e);
@@ -650,15 +669,9 @@ public class JDBCResourceDAO {
private void cleanOldLargeCellFromHdfs(String resPath) throws SQLException {
Path redirectPath = bigCellHDFSPath(resPath);
Path oldPath = new Path(redirectPath.toString() + "_old");
- FileSystem fileSystem = null;
- try {
- fileSystem = HadoopUtil.getFileSystem(redirectPath);
- } catch (IOException e) {
- e.printStackTrace();
- }
try {
- if (fileSystem.exists(oldPath)) {
- fileSystem.delete(oldPath, true);
+ if (redirectFileSystem.exists(oldPath)) {
+ redirectFileSystem.delete(oldPath, true);
}
} catch (Throwable e) {
logger.warn("error cleaning the backup file for " + redirectPath + ", leave it as garbage", e);
@@ -666,8 +679,9 @@ public class JDBCResourceDAO {
}
public Path bigCellHDFSPath(String resPath) {
- String metastoreBigCellHdfsDirectory = this.kylinConfig.getMetastoreBigCellHdfsDirectory();
- Path redirectPath = new Path(metastoreBigCellHdfsDirectory, "resources-jdbc" + resPath);
+ String hdfsWorkingDirectory = this.kylinConfig.getHdfsWorkingDirectory();
+ Path redirectPath = new Path(hdfsWorkingDirectory, "resources-jdbc" + resPath);
+ redirectPath = Path.getPathWithoutSchemeAndAuthority(redirectPath);
return redirectPath;
}
@@ -675,14 +689,18 @@ public class JDBCResourceDAO {
return queriedSqlNum;
}
+ /**
+ * Persist metadata to different SQL tables
+ * @param resPath the metadata path key
+ * @return the table name
+ */
public String getMetaTableName(String resPath) {
- if (resPath.startsWith(ResourceStore.BAD_QUERY_RESOURCE_ROOT) || resPath.startsWith(ResourceStore.CUBE_STATISTICS_ROOT)
- || resPath.startsWith(ResourceStore.DICT_RESOURCE_ROOT) || resPath.startsWith(ResourceStore.EXECUTE_RESOURCE_ROOT)
- || resPath.startsWith(ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT) || resPath.startsWith(ResourceStore.EXT_SNAPSHOT_RESOURCE_ROOT)
+ if (resPath.startsWith(ResourceStore.BAD_QUERY_RESOURCE_ROOT)
+ || resPath.startsWith(ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT)
|| resPath.startsWith(ResourceStore.TEMP_STATMENT_RESOURCE_ROOT)) {
- return tablesName[1];
+ return tableNames[1];
} else {
- return tablesName[0];
+ return tableNames[0];
}
}
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceStore.java b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceStore.java
index b62b33f..ea6e231 100644
--- a/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceStore.java
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceStore.java
@@ -18,12 +18,6 @@
package org.apache.kylin.common.persistence;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import org.apache.commons.io.IOUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.StorageURL;
-
import java.io.IOException;
import java.io.InputStream;
import java.sql.SQLException;
@@ -31,11 +25,18 @@ import java.util.List;
import java.util.NavigableSet;
import java.util.TreeSet;
+import org.apache.commons.io.IOUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.StorageURL;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
public class JDBCResourceStore extends ResourceStore {
private static final String JDBC_SCHEME = "jdbc";
- private String[] tablesName = new String[2];
+ private String metadataIdentifier;
private JDBCResourceDAO resourceDAO;
@@ -43,9 +44,8 @@ public class JDBCResourceStore extends ResourceStore {
super(kylinConfig);
StorageURL metadataUrl = kylinConfig.getMetadataUrl();
checkScheme(metadataUrl);
- tablesName[0] = metadataUrl.getIdentifier();
- tablesName[1] = metadataUrl.getIdentifier() + "1";
- this.resourceDAO = new JDBCResourceDAO(kylinConfig, tablesName);
+ metadataIdentifier = metadataUrl.getIdentifier();
+ this.resourceDAO = new JDBCResourceDAO(kylinConfig, metadataUrl.getIdentifier());
}
@Override
@@ -106,7 +106,7 @@ public class JDBCResourceStore extends ResourceStore {
@Override
protected List<RawResource> getAllResourcesImpl(String folderPath, long timeStart, long timeEndExclusive,
- final boolean isAllowBroken) throws IOException {
+ final boolean isAllowBroken) throws IOException {
final List<RawResource> result = Lists.newArrayList();
try {
List<JDBCResource> allResource = resourceDAO.getAllResource(makeFolderPath(folderPath), timeStart,
@@ -155,7 +155,7 @@ public class JDBCResourceStore extends ResourceStore {
@Override
protected String getReadableResourcePathImpl(String resPath) {
- return tablesName + "(key='" + resPath + "')@" + kylinConfig.getMetadataUrl();
+ return metadataIdentifier + "(key='" + resPath + "')@" + kylinConfig.getMetadataUrl();
}
private String makeFolderPath(String folderPath) {
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java b/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
index 7bc7387..152d506 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
@@ -85,13 +85,7 @@ public class HadoopUtil {
}
public static FileSystem getReadFileSystem() throws IOException {
- Configuration conf = getCurrentConfiguration();
- return getReadFileSystem(conf);
- }
-
- public static FileSystem getReadFileSystem(Configuration conf) throws IOException {
- Path parquetReadPath = new Path(KylinConfig.getInstanceFromEnv().getReadHdfsWorkingDirectory(null));
- return getFileSystem(parquetReadPath, conf);
+ return getFileSystem(KylinConfig.getInstanceFromEnv().getReadHdfsWorkingDirectory());
}
public static FileSystem getFileSystem(String path) throws IOException {
diff --git a/kylin-it/src/test/java/org/apache/kylin/storage/jdbc/ITJDBCResourceStoreTest.java b/kylin-it/src/test/java/org/apache/kylin/storage/jdbc/ITJDBCResourceStoreTest.java
index c7c33ac..75c93f0 100644
--- a/kylin-it/src/test/java/org/apache/kylin/storage/jdbc/ITJDBCResourceStoreTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/storage/jdbc/ITJDBCResourceStoreTest.java
@@ -88,9 +88,11 @@ public class ITJDBCResourceStoreTest extends HBaseMetadataTestCase {
connectionManager = JDBCConnectionManager.getConnectionManager();
conn = connectionManager.getConn();
statement = conn.createStatement();
- String sql = new MessageFormat(sqlQueryFormat.getTestDropSql(), Locale.ROOT).format(mainIdentifier, new StringBuffer(), new FieldPosition(0)).toString();
+ String sql = new MessageFormat(sqlQueryFormat.getTestDropSql(), Locale.ROOT)
+ .format(mainIdentifier, new StringBuffer(), new FieldPosition(0)).toString();
statement.executeUpdate(sql);
- sql = new MessageFormat(sqlQueryFormat.getTestDropSql(), Locale.ROOT).format(copyIdentifier, new StringBuffer(), new FieldPosition(0)).toString();
+ sql = new MessageFormat(sqlQueryFormat.getTestDropSql(), Locale.ROOT)
+ .format(copyIdentifier, new StringBuffer(), new FieldPosition(0)).toString();
statement.executeUpdate(sql);
jdbcConnectable = true;
ResourceTool.copy(configBackup, kylinConfig);
@@ -139,13 +141,6 @@ public class ITJDBCResourceStoreTest extends HBaseMetadataTestCase {
}
}
- // Support other db except mysql
- // @Test
- // public void testGetDbcpProperties() {
- // Properties prop = JDBCConnectionManager.getConnectionManager().getDbcpProperties();
- // assertEquals("com.mysql.jdbc.Driver", prop.get("driverClassName"));
- // }
-
@Test
public void testMsgFormatter() {
System.out.println(MessageFormatter.format("{}:{}", "a", "b"));