You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ma...@apache.org on 2018/02/02 06:36:57 UTC

carbondata git commit: [Compatibility] Added changes for backward compatibility

Repository: carbondata
Updated Branches:
  refs/heads/master 1b224a4a9 -> 02eefca15


[Compatibility] Added changes for backward compatibility

This PR will fix the issues related to old version and new version compatibility.
Issues fixed:
1. Schema file name was different in one of the previous versions.
2. Bucket number was not supported in the previous versions.
3. Table parameters were stored in lower case while in the current version we are reading in camel case.

This closes #1747


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/02eefca1
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/02eefca1
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/02eefca1

Branch: refs/heads/master
Commit: 02eefca15862a8667d53e247272afb68efe7af60
Parents: 1b224a4
Author: kunal642 <ku...@gmail.com>
Authored: Mon Nov 20 20:36:54 2017 +0530
Committer: manishgupta88 <to...@gmail.com>
Committed: Fri Feb 2 12:08:50 2018 +0530

----------------------------------------------------------------------
 .../core/util/path/CarbonTablePath.java         | 22 ++++++++-
 .../carbondata/spark/util/CarbonScalaUtil.scala | 47 ++++++++++++++++++++
 .../org/apache/spark/sql/CarbonSource.scala     | 33 ++++++++------
 3 files changed, 87 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/02eefca1/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
index fab6289..d8c64c4 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
@@ -233,7 +233,7 @@ public class CarbonTablePath extends Path {
    * @return absolute path of schema file
    */
   public String getSchemaFilePath() {
-    return getMetaDataDir() + File.separator + SCHEMA_FILE;
+    return getActualSchemaFilePath(tablePath);
   }
 
   /**
@@ -242,7 +242,22 @@ public class CarbonTablePath extends Path {
    * @return schema file path
    */
   public static String getSchemaFilePath(String tablePath) {
-    return tablePath + File.separator + METADATA_DIR + File.separator + SCHEMA_FILE;
+    return getActualSchemaFilePath(tablePath);
+  }
+
+  private static String getActualSchemaFilePath(String tablePath) {
+    String metaPath = tablePath + CarbonCommonConstants.FILE_SEPARATOR + METADATA_DIR;
+    CarbonFile carbonFile = FileFactory.getCarbonFile(metaPath);
+    CarbonFile[] schemaFile = carbonFile.listFiles(new CarbonFileFilter() {
+      @Override public boolean accept(CarbonFile file) {
+        return file.getName().startsWith(SCHEMA_FILE);
+      }
+    });
+    if (schemaFile != null && schemaFile.length > 0) {
+      return schemaFile[0].getAbsolutePath();
+    } else {
+      return metaPath + CarbonCommonConstants.FILE_SEPARATOR + SCHEMA_FILE;
+    }
   }
 
   /**
@@ -351,6 +366,9 @@ public class CarbonTablePath extends Path {
 
   private static String getCarbonIndexFileName(String taskNo, int bucketNumber,
       String factUpdatedtimeStamp) {
+    if (bucketNumber == -1) {
+      return taskNo + "-" + factUpdatedtimeStamp + INDEX_FILE_EXT;
+    }
     return taskNo + "-" + bucketNumber + "-" + factUpdatedtimeStamp + INDEX_FILE_EXT;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/02eefca1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
index 86d25b4..262adf2 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
@@ -404,4 +404,51 @@ object CarbonScalaUtil {
       })
     otherFields
   }
+
+  /**
+   * If the table is from an old store then the table parameters are in lowercase. In the current
+   * code we are reading the parameters as camel case.
+   * This method will convert all the schema parts to camel case
+   *
+   * @param parameters
+   * @return
+   */
+  def getDeserializedParameters(parameters: Map[String, String]): Map[String, String] = {
+    val keyParts = parameters.getOrElse("spark.sql.sources.options.keys.numparts", "0").toInt
+    if (keyParts == 0) {
+      parameters
+    } else {
+      val keyStr = 0 until keyParts map {
+        i => parameters(s"spark.sql.sources.options.keys.part.$i")
+      }
+      val finalProperties = scala.collection.mutable.Map.empty[String, String]
+      keyStr foreach {
+        key =>
+          var value = ""
+          for (numValues <- 0 until parameters(key.toLowerCase() + ".numparts").toInt) {
+            value += parameters(key.toLowerCase() + ".part" + numValues)
+          }
+          finalProperties.put(key, value)
+      }
+      // Database name would be extracted from the parameter first. There can be a scenario where
+      // the dbName is not written to the old schema therefore to be on a safer side we are
+      // extracting dbName from tableName if it exists.
+      val dbAndTableName = finalProperties("tableName").split(".")
+      if (dbAndTableName.length > 1) {
+        finalProperties.put("dbName", dbAndTableName(0))
+        finalProperties.put("tableName", dbAndTableName(1))
+      } else {
+        finalProperties.put("tableName", dbAndTableName(0))
+      }
+      // Overriding the tablePath in case tablepath already exists. This will happen when old
+      // table schema is updated by the new code then both `path` and `tablepath` will exist. In
+      // this case use tablepath
+      parameters.get("tablepath") match {
+        case Some(tablePath) => finalProperties.put("tablePath", tablePath)
+        case None =>
+      }
+      finalProperties.toMap
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/02eefca1/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index e61b636..7d70534 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -42,6 +42,7 @@ import org.apache.carbondata.core.metadata.schema.table.TableInfo
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.spark.CarbonOption
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+import org.apache.carbondata.spark.util.CarbonScalaUtil
 import org.apache.carbondata.streaming.{CarbonStreamException, StreamSinkFactory}
 
 /**
@@ -59,16 +60,20 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
     CarbonEnv.getInstance(sqlContext.sparkSession)
     // if path is provided we can directly create Hadoop relation. \
     // Otherwise create datasource relation
-    parameters.get("tablePath") match {
+    val newParameters = CarbonScalaUtil.getDeserializedParameters(parameters)
+    newParameters.get("tablePath") match {
       case Some(path) => CarbonDatasourceHadoopRelation(sqlContext.sparkSession,
         Array(path),
-        parameters,
+        newParameters,
         None)
       case _ =>
-        val options = new CarbonOption(parameters)
+        val options = new CarbonOption(newParameters)
         val tablePath =
           CarbonEnv.getTablePath(options.dbName, options.tableName)(sqlContext.sparkSession)
-        CarbonDatasourceHadoopRelation(sqlContext.sparkSession, Array(tablePath), parameters, None)
+        CarbonDatasourceHadoopRelation(sqlContext.sparkSession,
+          Array(tablePath),
+          newParameters,
+          None)
     }
   }
 
@@ -79,13 +84,14 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
       parameters: Map[String, String],
       data: DataFrame): BaseRelation = {
     CarbonEnv.getInstance(sqlContext.sparkSession)
+    val newParameters = CarbonScalaUtil.getDeserializedParameters(parameters)
     // User should not specify path since only one store is supported in carbon currently,
     // after we support multi-store, we can remove this limitation
-    require(!parameters.contains("path"), "'path' should not be specified, " +
+    require(!newParameters.contains("path"), "'path' should not be specified, " +
                                           "the path to store carbon file is the 'storePath' " +
                                           "specified when creating CarbonContext")
 
-    val options = new CarbonOption(parameters)
+    val options = new CarbonOption(newParameters)
     val tablePath = new Path(
       CarbonEnv.getTablePath(options.dbName, options.tableName)(sqlContext.sparkSession))
     val isExists = tablePath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
@@ -108,12 +114,12 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
 
     if (doSave) {
       // save data when the save mode is Overwrite.
-      new CarbonDataFrameWriter(sqlContext, data).saveAsCarbonFile(parameters)
+      new CarbonDataFrameWriter(sqlContext, data).saveAsCarbonFile(newParameters)
     } else if (doAppend) {
-      new CarbonDataFrameWriter(sqlContext, data).appendToCarbonFile(parameters)
+      new CarbonDataFrameWriter(sqlContext, data).appendToCarbonFile(newParameters)
     }
 
-    createRelation(sqlContext, parameters, data.schema)
+    createRelation(sqlContext, newParameters, data.schema)
   }
 
   // called by DDL operation with a USING clause
@@ -123,9 +129,10 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
       dataSchema: StructType): BaseRelation = {
     CarbonEnv.getInstance(sqlContext.sparkSession)
     addLateDecodeOptimization(sqlContext.sparkSession)
+    val newParameters = CarbonScalaUtil.getDeserializedParameters(parameters)
     val dbName: String =
-      CarbonEnv.getDatabaseName(parameters.get("dbName"))(sqlContext.sparkSession)
-    val tableOption: Option[String] = parameters.get("tableName")
+      CarbonEnv.getDatabaseName(newParameters.get("dbName"))(sqlContext.sparkSession)
+    val tableOption: Option[String] = newParameters.get("tableName")
     if (tableOption.isEmpty) {
       CarbonException.analysisException("Table creation failed. Table name is not specified")
     }
@@ -136,9 +143,9 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
     }
     val (path, updatedParams) = if (sqlContext.sparkSession.sessionState.catalog.listTables(dbName)
       .exists(_.table.equalsIgnoreCase(tableName))) {
-        getPathForTable(sqlContext.sparkSession, dbName, tableName, parameters)
+        getPathForTable(sqlContext.sparkSession, dbName, tableName, newParameters)
     } else {
-        createTableIfNotExists(sqlContext.sparkSession, parameters, dataSchema)
+        createTableIfNotExists(sqlContext.sparkSession, newParameters, dataSchema)
     }
 
     CarbonDatasourceHadoopRelation(sqlContext.sparkSession, Array(path), updatedParams,