You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2019/01/30 10:39:16 UTC

[carbondata] 12/27: [HOTFIX] presto carbon doesn't work with Hadoop conf in cluster.

This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/carbondata.git

commit 39c50d47f63614a56b91116cbe9d3e2304aa3243
Author: ajantha-bhat <aj...@gmail.com>
AuthorDate: Mon Jan 21 17:47:22 2019 +0530

    [HOTFIX] presto carbon doesn't work with Hadoop conf in cluster.
    
    problem : presto carbon doesn't work with Hadoop conf in cluster.
    
    cause:
    When presto queries are run in cluster, it fails with below message.
    IllegalArgumentException java.net.UnknownHostException: hacluster
    configuration from hdfsEnvironment is not used while checking schema path. hence the file factory is throwing exception.
    
    solution: set the configuration while checking schema path and other places in presto
    
    This closes #3089
---
 .../carbondata/core/util/path/CarbonTablePath.java |  23 ++++-
 .../carbondata/presto/impl/CarbonTableReader.java  | 105 +++++++++++----------
 2 files changed, 76 insertions(+), 52 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
index 8538e37..da558be 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
@@ -25,6 +25,8 @@ import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.locks.LockUsage;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
 
+import org.apache.hadoop.conf.Configuration;
+
 /**
  * Helps to get Table content paths.
  */
@@ -175,12 +177,27 @@ public class CarbonTablePath {
    * @return schema file path
    */
   public static String getSchemaFilePath(String tablePath) {
-    return getActualSchemaFilePath(tablePath);
+    return getActualSchemaFilePath(tablePath, null);
+  }
+
+  /**
+   * return the schema file path
+   * @param tablePath path to table files
+   * @param hadoopConf hadoop configuration instance
+   * @return schema file path
+   */
+  public static String getSchemaFilePath(String tablePath, Configuration hadoopConf) {
+    return getActualSchemaFilePath(tablePath, hadoopConf);
   }
 
-  private static String getActualSchemaFilePath(String tablePath) {
+  private static String getActualSchemaFilePath(String tablePath, Configuration hadoopConf) {
     String metaPath = tablePath + CarbonCommonConstants.FILE_SEPARATOR + METADATA_DIR;
-    CarbonFile carbonFile = FileFactory.getCarbonFile(metaPath);
+    CarbonFile carbonFile;
+    if (hadoopConf != null) {
+      carbonFile = FileFactory.getCarbonFile(metaPath, hadoopConf);
+    } else {
+      carbonFile = FileFactory.getCarbonFile(metaPath);
+    }
     CarbonFile[] schemaFile = carbonFile.listFiles(new CarbonFileFilter() {
       @Override public boolean accept(CarbonFile file) {
         return file.getName().startsWith(SCHEMA_FILE);
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
index 1121a37..916e44c 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
@@ -168,60 +168,59 @@ public class CarbonTableReader {
   private CarbonTableCacheModel parseCarbonMetadata(SchemaTableName table, String tablePath,
       Configuration config) {
     try {
-      CarbonTableCacheModel cache = carbonCache.get().get(table);
-      if (cache != null && cache.isValid()) {
+      CarbonTableCacheModel cache = getValidCacheBySchemaTableName(table);
+      if (cache != null) {
         return cache;
       }
-      // Step 1: get store path of the table and cache it.
-      String schemaFilePath = CarbonTablePath.getSchemaFilePath(tablePath);
-      // If metadata folder exists, it is a transactional table
-      CarbonFile schemaFile = FileFactory.getCarbonFile(schemaFilePath, config);
-      boolean isTransactionalTable = schemaFile.exists();
-      org.apache.carbondata.format.TableInfo tableInfo;
-      long modifiedTime = System.currentTimeMillis();
-      if (isTransactionalTable) {
-        //Step 2: read the metadata (tableInfo) of the table.
-        ThriftReader.TBaseCreator createTBase = new ThriftReader.TBaseCreator() {
-          // TBase is used to read and write thrift objects.
-          // TableInfo is a kind of TBase used to read and write table information.
-          // TableInfo is generated by thrift,
-          // see schema.thrift under format/src/main/thrift for details.
-          public TBase create() {
-            return new org.apache.carbondata.format.TableInfo();
-          }
-        };
-        ThriftReader thriftReader =
-            new ThriftReader(schemaFilePath, createTBase, config);
-        thriftReader.open();
-        tableInfo = (org.apache.carbondata.format.TableInfo) thriftReader.read();
-        thriftReader.close();
-        modifiedTime = schemaFile.getLastModifiedTime();
-      } else {
-        tableInfo = CarbonUtil
-            .inferSchema(tablePath, table.getTableName(), false, config);
-      }
-      // Step 3: convert format level TableInfo to code level TableInfo
-      SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
-      // wrapperTableInfo is the code level information of a table in carbondata core,
-      // different from the Thrift TableInfo.
-      TableInfo wrapperTableInfo = schemaConverter
-          .fromExternalToWrapperTableInfo(tableInfo, table.getSchemaName(), table.getTableName(),
-              tablePath);
-
-      wrapperTableInfo.setTransactionalTable(isTransactionalTable);
-
-      CarbonMetadata.getInstance().removeTable(wrapperTableInfo.getTableUniqueName());
-      // Step 4: Load metadata info into CarbonMetadata
-      CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo);
-      CarbonTable carbonTable = Objects.requireNonNull(
-          CarbonMetadata.getInstance().getCarbonTable(table.getSchemaName(), table.getTableName()),
-          "carbontable is null");
-      // If table is not previously cached, then:
-      if (cache == null) {
+      // multiple tasks can be launched in a worker concurrently. Hence need to synchronize this.
+      synchronized (this) {
+        // cache might be filled by another thread, so if filled use that cache.
+        CarbonTableCacheModel cacheModel = getValidCacheBySchemaTableName(table);
+        if (cacheModel != null) {
+          return cacheModel;
+        }
+        // Step 1: get store path of the table and cache it.
+        String schemaFilePath = CarbonTablePath.getSchemaFilePath(tablePath, config);
+        // If metadata folder exists, it is a transactional table
+        CarbonFile schemaFile = FileFactory.getCarbonFile(schemaFilePath, config);
+        boolean isTransactionalTable = schemaFile.exists();
+        org.apache.carbondata.format.TableInfo tableInfo;
+        long modifiedTime = System.currentTimeMillis();
+        if (isTransactionalTable) {
+          //Step 2: read the metadata (tableInfo) of the table.
+          ThriftReader.TBaseCreator createTBase = new ThriftReader.TBaseCreator() {
+            // TBase is used to read and write thrift objects.
+            // TableInfo is a kind of TBase used to read and write table information.
+            // TableInfo is generated by thrift,
+            // see schema.thrift under format/src/main/thrift for details.
+            public TBase create() {
+              return new org.apache.carbondata.format.TableInfo();
+            }
+          };
+          ThriftReader thriftReader = new ThriftReader(schemaFilePath, createTBase, config);
+          thriftReader.open();
+          tableInfo = (org.apache.carbondata.format.TableInfo) thriftReader.read();
+          thriftReader.close();
+          modifiedTime = schemaFile.getLastModifiedTime();
+        } else {
+          tableInfo = CarbonUtil.inferSchema(tablePath, table.getTableName(), false, config);
+        }
+        // Step 3: convert format level TableInfo to code level TableInfo
+        SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
+        // wrapperTableInfo is the code level information of a table in carbondata core,
+        // different from the Thrift TableInfo.
+        TableInfo wrapperTableInfo = schemaConverter
+            .fromExternalToWrapperTableInfo(tableInfo, table.getSchemaName(), table.getTableName(),
+                tablePath);
+        wrapperTableInfo.setTransactionalTable(isTransactionalTable);
+        CarbonMetadata.getInstance().removeTable(wrapperTableInfo.getTableUniqueName());
+        // Step 4: Load metadata info into CarbonMetadata
+        CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo);
+        CarbonTable carbonTable = Objects.requireNonNull(CarbonMetadata.getInstance()
+            .getCarbonTable(table.getSchemaName(), table.getTableName()), "carbontable is null");
         cache = new CarbonTableCacheModel(modifiedTime, carbonTable);
         // cache the table
         carbonCache.get().put(table, cache);
-      } else {
         cache.setCarbonTable(carbonTable);
       }
       return cache;
@@ -230,6 +229,14 @@ public class CarbonTableReader {
     }
   }
 
+  private CarbonTableCacheModel getValidCacheBySchemaTableName(SchemaTableName schemaTableName) {
+    CarbonTableCacheModel cache = carbonCache.get().get(schemaTableName);
+    if (cache != null && cache.isValid()) {
+      return cache;
+    }
+    return null;
+  }
+
   public List<CarbonLocalMultiBlockSplit> getInputSplits2(CarbonTableCacheModel tableCacheModel,
       Expression filters, TupleDomain<HiveColumnHandle> constraints, Configuration config)
       throws IOException {