You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2019/01/24 12:24:12 UTC
[carbondata] branch master updated: [HOTFIX] presto carbon doesn't
work with Hadoop conf in cluster.
This is an automated email from the ASF dual-hosted git repository.
ravipesala pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new d63b2c0 [HOTFIX] presto carbon doesn't work with Hadoop conf in cluster.
d63b2c0 is described below
commit d63b2c0cacec7509471daaaa9ffc0f2e1d1897f5
Author: ajantha-bhat <aj...@gmail.com>
AuthorDate: Mon Jan 21 17:47:22 2019 +0530
[HOTFIX] presto carbon doesn't work with Hadoop conf in cluster.
problem : presto carbon doesn't work with Hadoop conf in cluster.
cause:
When presto queries are run in cluster, it fails with below message.
IllegalArgumentException java.net.UnknownHostException: hacluster
configuration from hdfsEnvironment is not used while checking schema path. hence the file factory is throwing exception.
solution: set the configuration while checking schema path and other places in presto
This closes #3089
---
.../carbondata/core/util/path/CarbonTablePath.java | 23 ++++-
.../carbondata/presto/impl/CarbonTableReader.java | 105 +++++++++++----------
2 files changed, 76 insertions(+), 52 deletions(-)
diff --git a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
index 8538e37..da558be 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
@@ -25,6 +25,8 @@ import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.locks.LockUsage;
import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
+import org.apache.hadoop.conf.Configuration;
+
/**
* Helps to get Table content paths.
*/
@@ -175,12 +177,27 @@ public class CarbonTablePath {
* @return schema file path
*/
public static String getSchemaFilePath(String tablePath) {
- return getActualSchemaFilePath(tablePath);
+ return getActualSchemaFilePath(tablePath, null);
+ }
+
+ /**
+ * return the schema file path
+ * @param tablePath path to table files
+ * @param hadoopConf hadoop configuration instance
+ * @return schema file path
+ */
+ public static String getSchemaFilePath(String tablePath, Configuration hadoopConf) {
+ return getActualSchemaFilePath(tablePath, hadoopConf);
}
- private static String getActualSchemaFilePath(String tablePath) {
+ private static String getActualSchemaFilePath(String tablePath, Configuration hadoopConf) {
String metaPath = tablePath + CarbonCommonConstants.FILE_SEPARATOR + METADATA_DIR;
- CarbonFile carbonFile = FileFactory.getCarbonFile(metaPath);
+ CarbonFile carbonFile;
+ if (hadoopConf != null) {
+ carbonFile = FileFactory.getCarbonFile(metaPath, hadoopConf);
+ } else {
+ carbonFile = FileFactory.getCarbonFile(metaPath);
+ }
CarbonFile[] schemaFile = carbonFile.listFiles(new CarbonFileFilter() {
@Override public boolean accept(CarbonFile file) {
return file.getName().startsWith(SCHEMA_FILE);
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
index 1121a37..916e44c 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
@@ -168,60 +168,59 @@ public class CarbonTableReader {
private CarbonTableCacheModel parseCarbonMetadata(SchemaTableName table, String tablePath,
Configuration config) {
try {
- CarbonTableCacheModel cache = carbonCache.get().get(table);
- if (cache != null && cache.isValid()) {
+ CarbonTableCacheModel cache = getValidCacheBySchemaTableName(table);
+ if (cache != null) {
return cache;
}
- // Step 1: get store path of the table and cache it.
- String schemaFilePath = CarbonTablePath.getSchemaFilePath(tablePath);
- // If metadata folder exists, it is a transactional table
- CarbonFile schemaFile = FileFactory.getCarbonFile(schemaFilePath, config);
- boolean isTransactionalTable = schemaFile.exists();
- org.apache.carbondata.format.TableInfo tableInfo;
- long modifiedTime = System.currentTimeMillis();
- if (isTransactionalTable) {
- //Step 2: read the metadata (tableInfo) of the table.
- ThriftReader.TBaseCreator createTBase = new ThriftReader.TBaseCreator() {
- // TBase is used to read and write thrift objects.
- // TableInfo is a kind of TBase used to read and write table information.
- // TableInfo is generated by thrift,
- // see schema.thrift under format/src/main/thrift for details.
- public TBase create() {
- return new org.apache.carbondata.format.TableInfo();
- }
- };
- ThriftReader thriftReader =
- new ThriftReader(schemaFilePath, createTBase, config);
- thriftReader.open();
- tableInfo = (org.apache.carbondata.format.TableInfo) thriftReader.read();
- thriftReader.close();
- modifiedTime = schemaFile.getLastModifiedTime();
- } else {
- tableInfo = CarbonUtil
- .inferSchema(tablePath, table.getTableName(), false, config);
- }
- // Step 3: convert format level TableInfo to code level TableInfo
- SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
- // wrapperTableInfo is the code level information of a table in carbondata core,
- // different from the Thrift TableInfo.
- TableInfo wrapperTableInfo = schemaConverter
- .fromExternalToWrapperTableInfo(tableInfo, table.getSchemaName(), table.getTableName(),
- tablePath);
-
- wrapperTableInfo.setTransactionalTable(isTransactionalTable);
-
- CarbonMetadata.getInstance().removeTable(wrapperTableInfo.getTableUniqueName());
- // Step 4: Load metadata info into CarbonMetadata
- CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo);
- CarbonTable carbonTable = Objects.requireNonNull(
- CarbonMetadata.getInstance().getCarbonTable(table.getSchemaName(), table.getTableName()),
- "carbontable is null");
- // If table is not previously cached, then:
- if (cache == null) {
+ // multiple tasks can be launched in a worker concurrently. Hence need to synchronize this.
+ synchronized (this) {
+ // cache might be filled by another thread, so if filled use that cache.
+ CarbonTableCacheModel cacheModel = getValidCacheBySchemaTableName(table);
+ if (cacheModel != null) {
+ return cacheModel;
+ }
+ // Step 1: get store path of the table and cache it.
+ String schemaFilePath = CarbonTablePath.getSchemaFilePath(tablePath, config);
+ // If metadata folder exists, it is a transactional table
+ CarbonFile schemaFile = FileFactory.getCarbonFile(schemaFilePath, config);
+ boolean isTransactionalTable = schemaFile.exists();
+ org.apache.carbondata.format.TableInfo tableInfo;
+ long modifiedTime = System.currentTimeMillis();
+ if (isTransactionalTable) {
+ //Step 2: read the metadata (tableInfo) of the table.
+ ThriftReader.TBaseCreator createTBase = new ThriftReader.TBaseCreator() {
+ // TBase is used to read and write thrift objects.
+ // TableInfo is a kind of TBase used to read and write table information.
+ // TableInfo is generated by thrift,
+ // see schema.thrift under format/src/main/thrift for details.
+ public TBase create() {
+ return new org.apache.carbondata.format.TableInfo();
+ }
+ };
+ ThriftReader thriftReader = new ThriftReader(schemaFilePath, createTBase, config);
+ thriftReader.open();
+ tableInfo = (org.apache.carbondata.format.TableInfo) thriftReader.read();
+ thriftReader.close();
+ modifiedTime = schemaFile.getLastModifiedTime();
+ } else {
+ tableInfo = CarbonUtil.inferSchema(tablePath, table.getTableName(), false, config);
+ }
+ // Step 3: convert format level TableInfo to code level TableInfo
+ SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
+ // wrapperTableInfo is the code level information of a table in carbondata core,
+ // different from the Thrift TableInfo.
+ TableInfo wrapperTableInfo = schemaConverter
+ .fromExternalToWrapperTableInfo(tableInfo, table.getSchemaName(), table.getTableName(),
+ tablePath);
+ wrapperTableInfo.setTransactionalTable(isTransactionalTable);
+ CarbonMetadata.getInstance().removeTable(wrapperTableInfo.getTableUniqueName());
+ // Step 4: Load metadata info into CarbonMetadata
+ CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo);
+ CarbonTable carbonTable = Objects.requireNonNull(CarbonMetadata.getInstance()
+ .getCarbonTable(table.getSchemaName(), table.getTableName()), "carbontable is null");
cache = new CarbonTableCacheModel(modifiedTime, carbonTable);
// cache the table
carbonCache.get().put(table, cache);
- } else {
cache.setCarbonTable(carbonTable);
}
return cache;
@@ -230,6 +229,14 @@ public class CarbonTableReader {
}
}
+ private CarbonTableCacheModel getValidCacheBySchemaTableName(SchemaTableName schemaTableName) {
+ CarbonTableCacheModel cache = carbonCache.get().get(schemaTableName);
+ if (cache != null && cache.isValid()) {
+ return cache;
+ }
+ return null;
+ }
+
public List<CarbonLocalMultiBlockSplit> getInputSplits2(CarbonTableCacheModel tableCacheModel,
Expression filters, TupleDomain<HiveColumnHandle> constraints, Configuration config)
throws IOException {