You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/07/11 11:56:12 UTC
[3/3] carbondata git commit: [CARBONDATA-1284]Implement hive based
schema storage in carbon
[CARBONDATA-1284]Implement hive based schema storage in carbon
This closes #1149
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/2a9debfc
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/2a9debfc
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/2a9debfc
Branch: refs/heads/metadata
Commit: 2a9debfccfdc5633e5e00e95b860efb8b368ea12
Parents: 4b69c9d
Author: ravipesala <ra...@gmail.com>
Authored: Sat Jul 8 17:14:04 2017 +0530
Committer: jackylk <ja...@huawei.com>
Committed: Tue Jul 11 19:48:57 2017 +0800
----------------------------------------------------------------------
.../dictionary/ManageDictionaryAndBTree.java | 39 +-
.../core/constants/CarbonCommonConstants.java | 4 +
.../carbondata/core/locks/CarbonLockUtil.java | 11 +-
.../apache/carbondata/core/util/CarbonUtil.java | 127 +++
.../core/util/path/CarbonTablePath.java | 4 +
.../partition/TestDDLForPartitionTable.scala | 1 +
.../spark/rdd/AlterTableAddColumnRDD.scala | 7 +
.../carbondata/spark/rdd/CarbonMergerRDD.scala | 2 +
.../apache/carbondata/spark/rdd/Compactor.scala | 2 -
.../spark/rdd/DataManagementFunc.scala | 1 -
.../spark/rdd/NewCarbonDataLoadRDD.scala | 4 -
.../execution/command/carbonTableSchema.scala | 3 -
.../spark/rdd/CarbonDataRDDFactory.scala | 47 +-
.../spark/DictionaryDetailHelper.scala | 18 +-
.../spark/rdd/CarbonDataRDDFactory.scala | 44 +-
.../carbondata/spark/util/CarbonSparkUtil.scala | 15 +-
.../sql/CarbonDatasourceHadoopRelation.scala | 8 +-
.../scala/org/apache/spark/sql/CarbonEnv.scala | 6 +-
.../org/apache/spark/sql/CarbonSource.scala | 93 +-
.../execution/command/AlterTableCommands.scala | 53 +-
.../execution/command/CarbonHiveCommands.scala | 14 +-
.../sql/execution/command/DDLStrategy.scala | 14 +-
.../sql/execution/command/IUDCommands.scala | 5 +-
.../execution/command/carbonTableSchema.scala | 87 +-
.../spark/sql/hive/CarbonFileMetastore.scala | 531 ++++++++++
.../spark/sql/hive/CarbonHiveMetaStore.scala | 287 ++++++
.../apache/spark/sql/hive/CarbonMetaStore.scala | 170 ++++
.../apache/spark/sql/hive/CarbonMetastore.scala | 960 -------------------
.../apache/spark/sql/hive/CarbonRelation.scala | 292 ++++++
.../org/apache/spark/util/AlterTableUtil.scala | 57 +-
.../org/apache/spark/util/CleanFiles.scala | 6 +-
.../apache/spark/util/DeleteSegmentByDate.scala | 8 +-
.../apache/spark/util/DeleteSegmentById.scala | 6 +-
.../org/apache/spark/util/ShowSegments.scala | 6 +-
.../spark/util/AllDictionaryTestCase.scala | 10 +
.../util/ExternalColumnDictionaryTestCase.scala | 10 +
.../carbondata/CarbonDataSourceSuite.scala | 1 +
.../processing/merger/CarbonCompactionUtil.java | 11 +-
38 files changed, 1744 insertions(+), 1220 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionaryAndBTree.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionaryAndBTree.java b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionaryAndBTree.java
index a50bf15..bae9189 100644
--- a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionaryAndBTree.java
+++ b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionaryAndBTree.java
@@ -67,25 +67,28 @@ public class ManageDictionaryAndBTree {
String metadataDirectoryPath = carbonTablePath.getMetadataDirectoryPath();
CarbonFile metadataDir = FileFactory
.getCarbonFile(metadataDirectoryPath, FileFactory.getFileType(metadataDirectoryPath));
- // sort index file is created with dictionary size appended to it. So all the files
- // with a given column ID need to be listed
- CarbonFile[] listFiles = metadataDir.listFiles(new CarbonFileFilter() {
- @Override public boolean accept(CarbonFile path) {
- if (path.getName().startsWith(columnSchema.getColumnUniqueId())) {
- return true;
+ if (metadataDir.exists()) {
+ // sort index file is created with dictionary size appended to it. So all the files
+ // with a given column ID need to be listed
+ CarbonFile[] listFiles = metadataDir.listFiles(new CarbonFileFilter() {
+ @Override public boolean accept(CarbonFile path) {
+ if (path.getName().startsWith(columnSchema.getColumnUniqueId())) {
+ return true;
+ }
+ return false;
+ }
+ });
+ for (CarbonFile file : listFiles) {
+ // try catch is inside for loop because even if one deletion fails, other files
+ // still need to be deleted
+ try {
+ FileFactory.deleteFile(file.getCanonicalPath(),
+ FileFactory.getFileType(file.getCanonicalPath()));
+ } catch (IOException e) {
+ LOGGER.error("Failed to delete dictionary or sortIndex file for column "
+ + columnSchema.getColumnName() + "with column ID "
+ + columnSchema.getColumnUniqueId());
}
- return false;
- }
- });
- for (CarbonFile file : listFiles) {
- // try catch is inside for loop because even if one deletion fails, other files
- // still need to be deleted
- try {
- FileFactory
- .deleteFile(file.getCanonicalPath(), FileFactory.getFileType(file.getCanonicalPath()));
- } catch (IOException e) {
- LOGGER.error("Failed to delete dictionary or sortIndex file for column " + columnSchema
- .getColumnName() + "with column ID " + columnSchema.getColumnUniqueId());
}
}
// remove dictionary cache
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 6393131..55a292e 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1264,6 +1264,10 @@ public final class CarbonCommonConstants {
public static final String CARBON_BAD_RECORDS_ACTION_DEFAULT = "FORCE";
+ public static final String ENABLE_HIVE_SCHEMA_META_STORE = "spark.carbon.hive.schema.store";
+
+ public static final String ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT = "false";
+
private CarbonCommonConstants() {
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java b/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java
index eaaaf94..60a7564 100644
--- a/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java
@@ -19,7 +19,7 @@ package org.apache.carbondata.core.locks;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
/**
* This class contains all carbon lock utilities
@@ -66,15 +66,12 @@ public class CarbonLockUtil {
* Given a lock type this method will return a new lock object if not acquired by any other
* operation
*
- * @param carbonTable
+ * @param identifier
* @param lockType
* @return
*/
- public static ICarbonLock getLockObject(CarbonTable carbonTable,
- String lockType) {
- ICarbonLock carbonLock = CarbonLockFactory
- .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier().getCarbonTableIdentifier(),
- lockType);
+ public static ICarbonLock getLockObject(CarbonTableIdentifier identifier, String lockType) {
+ ICarbonLock carbonLock = CarbonLockFactory.getCarbonLockObj(identifier, lockType);
LOGGER.info("Trying to acquire lock: " + carbonLock);
if (carbonLock.lockWithRetries()) {
LOGGER.info("Successfully acquired the lock " + carbonLock);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 8298600..b9c164a 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -31,8 +31,10 @@ import java.nio.charset.Charset;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import org.apache.carbondata.common.logging.LogService;
@@ -58,6 +60,7 @@ import org.apache.carbondata.core.metadata.ValueEncoderMeta;
import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
@@ -70,9 +73,11 @@ import org.apache.carbondata.core.service.PathService;
import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
import org.apache.carbondata.core.util.path.CarbonStorePath;
import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.core.writer.ThriftWriter;
import org.apache.carbondata.format.DataChunk2;
import org.apache.carbondata.format.DataChunk3;
+import com.google.gson.Gson;
import org.apache.commons.lang.ArrayUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
@@ -1720,5 +1725,127 @@ public final class CarbonUtil {
public static boolean isValidBadStorePath(String badRecordsLocation) {
return !(null == badRecordsLocation || badRecordsLocation.length() == 0);
}
+
+ public static String convertToMultiGsonStrings(TableInfo tableInfo, String seperator,
+ String quote, String prefix) {
+ Gson gson = new Gson();
+ String schemaString = gson.toJson(tableInfo);
+ int schemaLen = schemaString.length();
+ int splitLen = 4000;
+ int parts = schemaLen / splitLen;
+ if (schemaLen % splitLen > 0) {
+ parts++;
+ }
+ StringBuilder builder =
+ new StringBuilder(prefix).append(quote).append("carbonSchemaPartsNo").append(quote)
+ .append(seperator).append("'").append(parts).append("',");
+ int runningLen = 0;
+ int endLen = splitLen;
+ for (int i = 0; i < parts; i++) {
+ if (i == parts - 1) {
+ endLen = schemaLen % splitLen;
+ }
+ builder.append(quote).append("carbonSchema").append(i).append(quote).append(seperator);
+ builder.append("'").append(schemaString.substring(runningLen, runningLen + endLen))
+ .append("'");
+ if (i < parts - 1) {
+ builder.append(",");
+ }
+ runningLen += splitLen;
+ }
+ return builder.toString();
+ }
+
+ public static Map<String, String> convertToMultiStringMap(TableInfo tableInfo) {
+ Map<String, String> map = new HashMap<>();
+ Gson gson = new Gson();
+ String schemaString = gson.toJson(tableInfo);
+ int schemaLen = schemaString.length();
+ int splitLen = 4000;
+ int parts = schemaLen / splitLen;
+ if (schemaLen % splitLen > 0) {
+ parts++;
+ }
+ map.put("carbonSchemaPartsNo", parts + "");
+ int runningLen = 0;
+ int endLen = splitLen;
+ for (int i = 0; i < parts; i++) {
+ if (i == parts - 1) {
+ endLen = schemaLen % splitLen;
+ }
+ map.put("carbonSchema" + i, schemaString.substring(runningLen, runningLen + endLen));
+ runningLen += splitLen;
+ }
+ return map;
+ }
+
+
+ public static TableInfo convertGsonToTableInfo(Map<String, String> properties) {
+ Gson gson = new Gson();
+ String partsNo = properties.get("carbonSchemaPartsNo");
+ if (partsNo == null) {
+ return null;
+ }
+ int no = Integer.parseInt(partsNo);
+ StringBuilder builder = new StringBuilder();
+ for (int i = 0; i < no; i++) {
+ String part = properties.get("carbonSchema" + i);
+ if (part == null) {
+ throw new RuntimeException("Some thing wrong in getting schema from hive metastore");
+ }
+ builder.append(part);
+ }
+ TableInfo tableInfo = gson.fromJson(builder.toString(), TableInfo.class);
+ return tableInfo;
+ }
+
+ /**
+ * This method will read the schema file from a given path
+ *
+ * @param schemaFilePath
+ * @return
+ */
+ public static org.apache.carbondata.format.TableInfo readSchemaFile(String schemaFilePath)
+ throws IOException {
+ TBaseCreator createTBase = new ThriftReader.TBaseCreator() {
+ public org.apache.thrift.TBase<org.apache.carbondata.format.TableInfo,
+ org.apache.carbondata.format.TableInfo._Fields> create() {
+ return new org.apache.carbondata.format.TableInfo();
+ }
+ };
+ ThriftReader thriftReader = new ThriftReader(schemaFilePath, createTBase);
+ thriftReader.open();
+ org.apache.carbondata.format.TableInfo tableInfo =
+ (org.apache.carbondata.format.TableInfo) thriftReader.read();
+ thriftReader.close();
+ return tableInfo;
+ }
+
+ public static void writeThriftTableToSchemaFile(String schemaFilePath,
+ org.apache.carbondata.format.TableInfo tableInfo) throws IOException {
+ ThriftWriter thriftWriter = new ThriftWriter(schemaFilePath, false);
+ try {
+ thriftWriter.open();
+ thriftWriter.write(tableInfo);
+ } finally {
+ thriftWriter.close();
+ }
+ }
+
+ public static void createDatabaseDirectory(String dbName, String storePath) throws IOException {
+ String databasePath = storePath + File.separator + dbName.toLowerCase();
+ FileFactory.FileType fileType = FileFactory.getFileType(databasePath);
+ FileFactory.mkdirs(databasePath, fileType);
+ }
+
+ public static void dropDatabaseDirectory(String dbName, String storePath)
+ throws IOException, InterruptedException {
+ String databasePath = storePath + File.separator + dbName;
+ FileFactory.FileType fileType = FileFactory.getFileType(databasePath);
+ if (FileFactory.isFileExist(databasePath, fileType)) {
+ CarbonFile dbPath = FileFactory.getCarbonFile(databasePath, fileType);
+ CarbonUtil.deleteFoldersAndFiles(dbPath);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
index 101419d..5824d76 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
@@ -432,6 +432,10 @@ public class CarbonTablePath extends Path {
return tablePath + File.separator + FACT_DIR;
}
+ public CarbonTableIdentifier getCarbonTableIdentifier() {
+ return carbonTableIdentifier;
+ }
+
@Override public boolean equals(Object o) {
if (!(o instanceof CarbonTablePath)) {
return false;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTable.scala
index 5b77a24..c38b249 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTable.scala
@@ -128,6 +128,7 @@ class TestDDLForPartitionTable extends QueryTest with BeforeAndAfterAll {
}
test("test describe formatted for partition column") {
+ sql("drop table if exists des")
sql(
"""create table des(a int, b string) partitioned by (c string) stored by 'carbondata'
|tblproperties ('partition_type'='list','list_info'='1,2')""".stripMargin)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala
index 7eea95d..2fb72f7 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala
@@ -22,6 +22,7 @@ import org.apache.spark.rdd.RDD
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.metadata.CarbonTableIdentifier
import org.apache.carbondata.core.metadata.encoder.Encoding
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
@@ -80,6 +81,12 @@ class AlterTableAddColumnRDD[K, V](sc: SparkContext,
CarbonCommonConstants.DEFAULT_CHARSET_CLASS)
}
CarbonProperties.getInstance.addProperty(CarbonCommonConstants.LOCK_TYPE, lockType)
+ // Create table and metadata folders if not exist
+ val metadataDirectoryPath = carbonTablePath.getMetadataDirectoryPath
+ val fileType = FileFactory.getFileType(metadataDirectoryPath)
+ if (!FileFactory.isFileExist(metadataDirectoryPath, fileType)) {
+ FileFactory.mkdirs(metadataDirectoryPath, fileType)
+ }
GlobalDictionaryUtil
.loadDefaultDictionaryValueForNewColumn(carbonTablePath,
columnSchema,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index 815dba3..bc5ca06 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -265,6 +265,8 @@ class CarbonMergerRDD[K, V](
val jobConf: JobConf = new JobConf(new Configuration)
val job: Job = new Job(jobConf)
val format = CarbonInputFormatUtil.createCarbonInputFormat(absoluteTableIdentifier, job)
+ CarbonInputFormat.setTableInfo(job.getConfiguration,
+ carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getTableInfo)
var updateDetails: UpdateVO = null
// initialise query_id for job
job.getConfiguration.set("query.id", queryId)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
index 1a237f6..4cebcd8 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
@@ -39,7 +39,6 @@ object Compactor {
val storePath = compactionCallableModel.storePath
val storeLocation = compactionCallableModel.storeLocation
val carbonTable = compactionCallableModel.carbonTable
- val cubeCreationTime = compactionCallableModel.cubeCreationTime
val loadsToMerge = compactionCallableModel.loadsToMerge
val sc = compactionCallableModel.sqlContext
val carbonLoadModel = compactionCallableModel.carbonLoadModel
@@ -57,7 +56,6 @@ object Compactor {
storePath,
carbonTable.getMetaDataFilepath,
mergedLoadName,
- cubeCreationTime,
databaseName,
factTableName,
validSegments,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
index 954303f..5ab8160 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
@@ -250,7 +250,6 @@ object DataManagementFunc {
carbonLoadModel,
storeLocation,
compactionModel.carbonTable,
- compactionModel.tableCreationTime,
loadsToMerge,
sqlContext,
compactionModel.compactionType
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 129c642..7cbe9a4 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -388,8 +388,6 @@ class NewDataFrameLoaderRDD[K, V](
result: DataLoadResult[K, V],
carbonLoadModel: CarbonLoadModel,
loadCount: Integer,
- tableCreationTime: Long,
- schemaLastUpdatedTime: Long,
prev: DataLoadCoalescedRDD[Row]) extends CarbonRDD[(K, V)](prev) {
override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
@@ -583,8 +581,6 @@ class PartitionTableDataLoaderRDD[K, V](
result: DataLoadResult[K, V],
carbonLoadModel: CarbonLoadModel,
loadCount: Integer,
- tableCreationTime: Long,
- schemaLastUpdatedTime: Long,
prev: RDD[Row]) extends CarbonRDD[(K, V)](prev) {
override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 6174f7c..500e18e 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -90,7 +90,6 @@ case class CarbonMergerMapping(storeLocation: String,
hdfsStoreLocation: String,
metadataFilePath: String,
var mergedLoadName: String,
- tableCreationTime: Long,
databaseName: String,
factTableName: String,
validSegments: Array[String],
@@ -117,14 +116,12 @@ case class UpdateTableModel(isUpdate: Boolean,
case class CompactionModel(compactionSize: Long,
compactionType: CompactionType,
carbonTable: CarbonTable,
- tableCreationTime: Long,
isDDLTrigger: Boolean)
case class CompactionCallableModel(storePath: String,
carbonLoadModel: CarbonLoadModel,
storeLocation: String,
carbonTable: CarbonTable,
- cubeCreationTime: Long,
loadsToMerge: util.List[LoadMetadataDetails],
sqlContext: SQLContext,
compactionType: CompactionType)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 3579b8a..31b05bc 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -104,8 +104,6 @@ object CarbonDataRDDFactory {
LOGGER.audit(s"Compaction request received for table " +
s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
- val tableCreationTime = CarbonEnv.get.carbonMetastore
- .getTableCreationTime(carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
if (null == carbonLoadModel.getLoadMetadataDetails) {
CommonUtil.readLoadMetadataDetails(carbonLoadModel, storePath)
@@ -118,7 +116,6 @@ object CarbonDataRDDFactory {
val compactionModel = CompactionModel(compactionSize,
compactionType,
carbonTable,
- tableCreationTime,
isCompactionTriggerByDDl
)
@@ -272,23 +269,19 @@ object CarbonDataRDDFactory {
if (!isConcurrentCompactionAllowed) {
LOGGER.info("System level compaction lock is enabled.")
val skipCompactionTables = ListBuffer[CarbonTableIdentifier]()
- var tableForCompaction = CarbonCompactionUtil
- .getNextTableToCompact(CarbonEnv.get.carbonMetastore.metadata.tablesMeta.toArray,
+ var table: CarbonTable = CarbonCompactionUtil
+ .getNextTableToCompact(CarbonEnv.get.carbonMetastore.metadata.
+ tablesMeta.map(_.carbonTable).toArray,
skipCompactionTables.toList.asJava)
- while (null != tableForCompaction) {
+ while (null != table) {
LOGGER.info("Compaction request has been identified for table " +
- s"${ tableForCompaction.carbonTable.getDatabaseName }." +
- s"${ tableForCompaction.carbonTableIdentifier.getTableName }")
- val table: CarbonTable = tableForCompaction.carbonTable
+ s"${ table.getDatabaseName }." +
+ s"${ table.getFactTableName }")
val metadataPath = table.getMetaDataFilepath
val compactionType = CarbonCompactionUtil.determineCompactionType(metadataPath)
val newCarbonLoadModel = new CarbonLoadModel()
DataManagementFunc.prepareCarbonLoadModel(storePath, table, newCarbonLoadModel)
- val tableCreationTime = CarbonEnv.get.carbonMetastore
- .getTableCreationTime(newCarbonLoadModel.getDatabaseName,
- newCarbonLoadModel.getTableName
- )
val compactionSize = CarbonDataMergerUtil
.getCompactionSize(CompactionType.MAJOR_COMPACTION)
@@ -296,7 +289,6 @@ object CarbonDataRDDFactory {
val newcompactionModel = CompactionModel(compactionSize,
compactionType,
table,
- tableCreationTime,
compactionModel.isDDLTrigger
)
// proceed for compaction
@@ -309,8 +301,8 @@ object CarbonDataRDDFactory {
} catch {
case e: Exception =>
LOGGER.error("Exception in compaction thread for table " +
- s"${ tableForCompaction.carbonTable.getDatabaseName }." +
- s"${ tableForCompaction.carbonTableIdentifier.getTableName }")
+ s"${ table.getDatabaseName }." +
+ s"${ table.getFactTableName }")
// not handling the exception. only logging as this is not the table triggered
// by user.
} finally {
@@ -319,16 +311,16 @@ object CarbonDataRDDFactory {
.deleteCompactionRequiredFile(metadataPath, compactionType)) {
// if the compaction request file is not been able to delete then
// add those tables details to the skip list so that it wont be considered next.
- skipCompactionTables.+=:(tableForCompaction.carbonTableIdentifier)
+ skipCompactionTables.+=:(table.getCarbonTableIdentifier)
LOGGER.error("Compaction request file can not be deleted for table " +
- s"${ tableForCompaction.carbonTable.getDatabaseName }." +
- s"${ tableForCompaction.carbonTableIdentifier.getTableName }")
+ s"${ table.getDatabaseName }." +
+ s"${ table.getFactTableName }")
}
}
// ********* check again for all the tables.
- tableForCompaction = CarbonCompactionUtil
+ table = CarbonCompactionUtil
.getNextTableToCompact(CarbonEnv.get.carbonMetastore.metadata
- .tablesMeta.toArray, skipCompactionTables.asJava
+ .tablesMeta.map(_.carbonTable).toArray, skipCompactionTables.asJava
)
}
// giving the user his error for telling in the beeline if his triggered table
@@ -360,7 +352,7 @@ object CarbonDataRDDFactory {
val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
val isAgg = false
// for handling of the segment Merging.
- def handleSegmentMerging(tableCreationTime: Long): Unit = {
+ def handleSegmentMerging(): Unit = {
LOGGER.info(s"compaction need status is" +
s" ${ CarbonDataMergerUtil.checkIfAutoLoadMergingRequired() }")
if (CarbonDataMergerUtil.checkIfAutoLoadMergingRequired()) {
@@ -371,7 +363,6 @@ object CarbonDataRDDFactory {
val compactionModel = CompactionModel(compactionSize,
CompactionType.MINOR_COMPACTION,
carbonTable,
- tableCreationTime,
isCompactionTriggerByDDl
)
var storeLocation = ""
@@ -479,10 +470,6 @@ object CarbonDataRDDFactory {
// reading the start time of data load.
val loadStartTime = CarbonUpdateUtil.readCurrentTime();
carbonLoadModel.setFactTimeStamp(loadStartTime)
- val tableCreationTime = CarbonEnv.get.carbonMetastore
- .getTableCreationTime(carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
- val schemaLastUpdatedTime = CarbonEnv.get.carbonMetastore
- .getSchemaLastUpdatedTime(carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
// get partition way from configuration
// val isTableSplitPartition = CarbonProperties.getInstance().getProperty(
@@ -634,8 +621,6 @@ object CarbonDataRDDFactory {
new DataLoadResultImpl(),
carbonLoadModel,
currentLoadCount,
- tableCreationTime,
- schemaLastUpdatedTime,
newRdd).collect()
} catch {
case ex: Exception =>
@@ -748,8 +733,6 @@ object CarbonDataRDDFactory {
new DataLoadResultImpl(),
carbonLoadModel,
currentLoadCount,
- tableCreationTime,
- schemaLastUpdatedTime,
rdd).collect()
} catch {
case ex: Exception =>
@@ -974,7 +957,7 @@ object CarbonDataRDDFactory {
}
try {
// compaction handling
- handleSegmentMerging(tableCreationTime)
+ handleSegmentMerging()
} catch {
case e: Exception =>
throw new Exception(
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/integration/spark2/src/main/scala/org/apache/carbondata/spark/DictionaryDetailHelper.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/DictionaryDetailHelper.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/DictionaryDetailHelper.scala
index 32ba6cf..779ace1 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/DictionaryDetailHelper.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/DictionaryDetailHelper.scala
@@ -36,15 +36,17 @@ class DictionaryDetailHelper extends DictionaryDetailService {
// Metadata folder
val metadataDirectory = FileFactory.getCarbonFile(dictfolderPath, fileType)
// need list all dictionary file paths with exists flag
- val carbonFiles = metadataDirectory.listFiles(new CarbonFileFilter {
- @Override def accept(pathname: CarbonFile): Boolean = {
- CarbonTablePath.isDictionaryFile(pathname)
- }
- })
- // 2 put dictionary file names to fileNamesMap
val fileNamesMap = new HashMap[String, Int]
- for (i <- 0 until carbonFiles.length) {
- fileNamesMap.put(carbonFiles(i).getName, i)
+ if (metadataDirectory.exists()) {
+ val carbonFiles = metadataDirectory.listFiles(new CarbonFileFilter {
+ @Override def accept(pathname: CarbonFile): Boolean = {
+ CarbonTablePath.isDictionaryFile(pathname)
+ }
+ })
+ // 2 put dictionary file names to fileNamesMap
+ for (i <- 0 until carbonFiles.length) {
+ fileNamesMap.put(carbonFiles(i).getName, i)
+ }
}
// 3 lookup fileNamesMap, if file name is in fileNamesMap, file is exists, or not.
primDimensions.zipWithIndex.foreach { f =>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 5c20808..fc813d1 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -105,8 +105,6 @@ object CarbonDataRDDFactory {
LOGGER.audit(s"Compaction request received for table " +
s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
- val tableCreationTime = CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetastore
- .getTableCreationTime(carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
if (null == carbonLoadModel.getLoadMetadataDetails) {
CommonUtil.readLoadMetadataDetails(carbonLoadModel, storePath)
@@ -124,7 +122,6 @@ object CarbonDataRDDFactory {
val compactionModel = CompactionModel(compactionSize,
compactionType,
carbonTable,
- tableCreationTime,
isCompactionTriggerByDDl
)
@@ -285,22 +282,18 @@ object CarbonDataRDDFactory {
val skipCompactionTables = ListBuffer[CarbonTableIdentifier]()
var tableForCompaction = CarbonCompactionUtil
.getNextTableToCompact(CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetastore
- .metadata.tablesMeta.toArray,
+ .listAllTables(sqlContext.sparkSession).toArray,
skipCompactionTables.toList.asJava)
while (null != tableForCompaction) {
LOGGER.info("Compaction request has been identified for table " +
- s"${ tableForCompaction.carbonTable.getDatabaseName }." +
- s"${ tableForCompaction.carbonTableIdentifier.getTableName }")
- val table: CarbonTable = tableForCompaction.carbonTable
+ s"${ tableForCompaction.getDatabaseName }." +
+ s"${ tableForCompaction.getFactTableName}")
+ val table: CarbonTable = tableForCompaction
val metadataPath = table.getMetaDataFilepath
val compactionType = CarbonCompactionUtil.determineCompactionType(metadataPath)
val newCarbonLoadModel = new CarbonLoadModel()
DataManagementFunc.prepareCarbonLoadModel(storePath, table, newCarbonLoadModel)
- val tableCreationTime = CarbonEnv.getInstance(sqlContext.sparkSession)
- .carbonMetastore.getTableCreationTime(newCarbonLoadModel.getDatabaseName,
- newCarbonLoadModel.getTableName
- )
val compactionSize = CarbonDataMergerUtil
.getCompactionSize(CompactionType.MAJOR_COMPACTION)
@@ -308,7 +301,6 @@ object CarbonDataRDDFactory {
val newcompactionModel = CompactionModel(compactionSize,
compactionType,
table,
- tableCreationTime,
compactionModel.isDDLTrigger
)
// proceed for compaction
@@ -321,8 +313,8 @@ object CarbonDataRDDFactory {
} catch {
case e: Exception =>
LOGGER.error("Exception in compaction thread for table " +
- s"${ tableForCompaction.carbonTable.getDatabaseName }." +
- s"${ tableForCompaction.carbonTableIdentifier.getTableName }")
+ s"${ tableForCompaction.getDatabaseName }." +
+ s"${ tableForCompaction.getFactTableName }")
// not handling the exception. only logging as this is not the table triggered
// by user.
} finally {
@@ -331,17 +323,17 @@ object CarbonDataRDDFactory {
.deleteCompactionRequiredFile(metadataPath, compactionType)) {
// if the compaction request file is not been able to delete then
// add those tables details to the skip list so that it wont be considered next.
- skipCompactionTables.+=:(tableForCompaction.carbonTableIdentifier)
+ skipCompactionTables.+=:(tableForCompaction.getCarbonTableIdentifier)
LOGGER.error("Compaction request file can not be deleted for table " +
- s"${ tableForCompaction.carbonTable.getDatabaseName }." +
- s"${ tableForCompaction.carbonTableIdentifier.getTableName }")
+ s"${ tableForCompaction.getDatabaseName }." +
+ s"${ tableForCompaction.getFactTableName }")
}
}
// ********* check again for all the tables.
tableForCompaction = CarbonCompactionUtil
.getNextTableToCompact(CarbonEnv.getInstance(sqlContext.sparkSession)
- .carbonMetastore.metadata
- .tablesMeta.toArray, skipCompactionTables.asJava
+ .carbonMetastore.listAllTables(sqlContext.sparkSession).toArray,
+ skipCompactionTables.asJava
)
}
}
@@ -373,7 +365,7 @@ object CarbonDataRDDFactory {
val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
val isAgg = false
// for handling of the segment Merging.
- def handleSegmentMerging(tableCreationTime: Long): Unit = {
+ def handleSegmentMerging(): Unit = {
LOGGER.info(s"compaction need status is" +
s" ${ CarbonDataMergerUtil.checkIfAutoLoadMergingRequired() }")
if (CarbonDataMergerUtil.checkIfAutoLoadMergingRequired()) {
@@ -384,7 +376,6 @@ object CarbonDataRDDFactory {
val compactionModel = CompactionModel(compactionSize,
CompactionType.MINOR_COMPACTION,
carbonTable,
- tableCreationTime,
isCompactionTriggerByDDl
)
var storeLocation = ""
@@ -492,11 +483,6 @@ object CarbonDataRDDFactory {
// reading the start time of data load.
val loadStartTime = CarbonUpdateUtil.readCurrentTime();
carbonLoadModel.setFactTimeStamp(loadStartTime)
- val tableCreationTime = CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetastore
- .getTableCreationTime(carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
- val schemaLastUpdatedTime = CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetastore
- .getSchemaLastUpdatedTime(carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
-
// get partition way from configuration
// val isTableSplitPartition = CarbonProperties.getInstance().getProperty(
// CarbonCommonConstants.TABLE_SPLIT_PARTITION,
@@ -645,8 +631,6 @@ object CarbonDataRDDFactory {
new DataLoadResultImpl(),
carbonLoadModel,
currentLoadCount,
- tableCreationTime,
- schemaLastUpdatedTime,
newRdd).collect()
} catch {
@@ -760,8 +744,6 @@ object CarbonDataRDDFactory {
new DataLoadResultImpl(),
carbonLoadModel,
currentLoadCount,
- tableCreationTime,
- schemaLastUpdatedTime,
rdd).collect()
} catch {
case ex: Exception =>
@@ -998,7 +980,7 @@ object CarbonDataRDDFactory {
}
try {
// compaction handling
- handleSegmentMerging(tableCreationTime)
+ handleSegmentMerging()
} catch {
case e: Exception =>
throw new Exception(
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
index 2f65fbc..d1d3015 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
@@ -19,11 +19,13 @@ package org.apache.carbondata.spark.util
import scala.collection.JavaConverters._
-import org.apache.spark.sql.hive.{CarbonMetaData, DictionaryMap}
+import org.apache.spark.sql.hive.{CarbonMetaData, CarbonRelation, DictionaryMap}
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.encoder.Encoding
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.processing.merger.TableMeta
case class TransformHolder(rdd: Any, mataData: CarbonMetaData)
@@ -42,4 +44,13 @@ object CarbonSparkUtil {
}
CarbonMetaData(dimensionsAttr, measureAttr, carbonTable, DictionaryMap(dictionary.toMap))
}
+
+ def createCarbonRelation(tableInfo: TableInfo, tablePath: String): CarbonRelation = {
+ val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
+ val table = CarbonTable.buildFromTableInfo(tableInfo)
+ val meta = new TableMeta(identifier.getCarbonTableIdentifier, identifier.getStorePath, table)
+ CarbonRelation(tableInfo.getDatabaseName, tableInfo.getFactTable.getTableName,
+ CarbonSparkUtil.createSparkMeta(table), meta)
+ }
+
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index d1baf79..7411e6e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -54,11 +54,9 @@ case class CarbonDatasourceHadoopRelation(
ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo)
@transient lazy val carbonRelation: CarbonRelation =
- CarbonEnv.getInstance(sparkSession).carbonMetastore
- .lookupRelation(
- Some(identifier.getCarbonTableIdentifier.getDatabaseName),
- identifier.getCarbonTableIdentifier.getTableName)(sparkSession)
- .asInstanceOf[CarbonRelation]
+ CarbonEnv.getInstance(sparkSession).carbonMetastore.
+ createCarbonRelation(parameters, identifier, sparkSession)
+
@transient lazy val carbonTable: CarbonTable = carbonRelation.tableMeta.carbonTable
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index 925b82b..d19eb39 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql
import java.util.Map
import java.util.concurrent.ConcurrentHashMap
-import org.apache.spark.sql.hive.{CarbonMetastore, CarbonSessionCatalog}
+import org.apache.spark.sql.hive.{CarbonMetaStore, CarbonMetaStoreFactory, CarbonSessionCatalog}
import org.apache.spark.sql.internal.CarbonSQLConf
import org.apache.carbondata.common.logging.LogServiceFactory
@@ -34,7 +34,7 @@ import org.apache.carbondata.spark.readsupport.SparkRowReadSupportImpl
*/
class CarbonEnv {
- var carbonMetastore: CarbonMetastore = _
+ var carbonMetastore: CarbonMetaStore = _
var sessionParams: SessionParams = _
@@ -64,7 +64,7 @@ class CarbonEnv {
val storePath =
CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION)
LOGGER.info(s"carbon env initial: $storePath")
- new CarbonMetastore(sparkSession.conf, storePath)
+ CarbonMetaStoreFactory.createCarbonMetaStore(sparkSession.conf, storePath)
}
CarbonProperties.getInstance.addProperty(CarbonCommonConstants.IS_DRIVER_INSTANCE, "true")
initialized = true
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index 1c16143..498ea03 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -17,20 +17,23 @@
package org.apache.spark.sql
+import scala.collection.JavaConverters._
import scala.language.implicitConversions
import org.apache.commons.lang.StringUtils
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.execution.CarbonLateDecodeStrategy
-import org.apache.spark.sql.execution.command.{BucketFields, CreateTable, Field}
+import org.apache.spark.sql.execution.command.{BucketFields, CreateTable, Field, TableModel, TableNewProcessor}
import org.apache.spark.sql.optimizer.CarbonLateDecodeRule
import org.apache.spark.sql.parser.CarbonSpark2SqlParser
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{DecimalType, StructType}
import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.metadata.schema.table.TableInfo
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
import org.apache.carbondata.spark.CarbonOption
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
@@ -125,7 +128,7 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
}
val path = if (sqlContext.sparkSession.sessionState.catalog.listTables(dbName)
.exists(_.table.equalsIgnoreCase(tableName))) {
- getPathForTable(sqlContext.sparkSession, dbName, tableName)
+ getPathForTable(sqlContext.sparkSession, dbName, tableName, parameters)
} else {
createTableIfNotExists(sqlContext.sparkSession, parameters, dataSchema)
}
@@ -148,20 +151,22 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
val dbName: String = parameters.getOrElse("dbName",
CarbonCommonConstants.DATABASE_DEFAULT_NAME).toLowerCase
val tableName: String = parameters.getOrElse("tableName", "").toLowerCase
+
try {
- CarbonEnv.getInstance(sparkSession).carbonMetastore
- .lookupRelation(Option(dbName), tableName)(sparkSession)
- CarbonEnv.getInstance(sparkSession).carbonMetastore.storePath + s"/$dbName/$tableName"
+ if (parameters.contains("carbonSchemaPartsNo")) {
+ getPathForTable(sparkSession, dbName, tableName, parameters)
+ } else {
+ CarbonEnv.getInstance(sparkSession).carbonMetastore
+ .lookupRelation(Option(dbName), tableName)(sparkSession)
+ CarbonEnv.getInstance(sparkSession).carbonMetastore.storePath + s"/$dbName/$tableName"
+ }
} catch {
case ex: NoSuchTableException =>
- val sqlParser = new CarbonSpark2SqlParser
- val fields = sqlParser.getFields(dataSchema)
- val map = scala.collection.mutable.Map[String, String]()
- parameters.foreach { case (key, value) => map.put(key, value.toLowerCase()) }
- val options = new CarbonOption(parameters)
- val bucketFields = sqlParser.getBucketFields(map, fields, options)
- val cm = sqlParser.prepareTableModel(ifNotExistPresent = false, Option(dbName),
- tableName, fields, Nil, map, bucketFields)
+ val cm: TableModel = CarbonSource.createTableInfoFromParams(
+ parameters,
+ dataSchema,
+ dbName,
+ tableName)
CreateTable(cm, false).run(sparkSession)
CarbonEnv.getInstance(sparkSession).carbonMetastore.storePath + s"/$dbName/$tableName"
case ex: Exception =>
@@ -171,13 +176,14 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
/**
* Returns the path of the table
+ *
* @param sparkSession
* @param dbName
* @param tableName
* @return
*/
private def getPathForTable(sparkSession: SparkSession, dbName: String,
- tableName : String): String = {
+ tableName : String, parameters: Map[String, String]): String = {
if (StringUtils.isBlank(tableName)) {
throw new MalformedCarbonCommandException("The Specified Table Name is Blank")
@@ -186,9 +192,13 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
throw new MalformedCarbonCommandException("Table Name Should not have spaces ")
}
try {
- CarbonEnv.getInstance(sparkSession).carbonMetastore
- .lookupRelation(Option(dbName), tableName)(sparkSession)
- CarbonEnv.getInstance(sparkSession).carbonMetastore.storePath + s"/$dbName/$tableName"
+ if (parameters.contains("tablePath")) {
+ parameters.get("tablePath").get
+ } else {
+ CarbonEnv.getInstance(sparkSession).carbonMetastore
+ .lookupRelation(Option(dbName), tableName)(sparkSession)
+ CarbonEnv.getInstance(sparkSession).carbonMetastore.storePath + s"/$dbName/$tableName"
+ }
} catch {
case ex: Exception =>
throw new Exception(s"Do not have $dbName and $tableName", ex)
@@ -196,3 +206,50 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
}
}
+
+object CarbonSource {
+
+ def createTableInfoFromParams(parameters: Map[String, String],
+ dataSchema: StructType,
+ dbName: String,
+ tableName: String): TableModel = {
+ val sqlParser = new CarbonSpark2SqlParser
+ val fields = sqlParser.getFields(dataSchema)
+ val map = scala.collection.mutable.Map[String, String]()
+ parameters.foreach { case (key, value) => map.put(key, value.toLowerCase()) }
+ val options = new CarbonOption(parameters)
+ val bucketFields = sqlParser.getBucketFields(map, fields, options)
+ sqlParser.prepareTableModel(ifNotExistPresent = false, Option(dbName),
+ tableName, fields, Nil, map, bucketFields)
+ }
+
+ /**
+ * Update spark catalog table with schema information in case of schema storage is hive metastore
+ * @param tableDesc
+ * @param sparkSession
+ * @return
+ */
+ def updateCatalogTableWithCarbonSchema(tableDesc: CatalogTable,
+ sparkSession: SparkSession): CatalogTable = {
+ val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore
+ val storageFormat = tableDesc.storage
+ val properties = storageFormat.properties
+ if (metaStore.isReadFromHiveMetaStore && !properties.contains("carbonSchemaPartsNo")) {
+ val dbName: String = properties.getOrElse("dbName",
+ CarbonCommonConstants.DATABASE_DEFAULT_NAME).toLowerCase
+ val tableName: String = properties.getOrElse("tableName", "").toLowerCase
+ val model = createTableInfoFromParams(properties, tableDesc.schema, dbName, tableName)
+ val tableInfo: TableInfo = TableNewProcessor(model)
+ val (tablePath, carbonSchemaString) =
+ metaStore.createTableFromThrift(tableInfo, dbName, tableName)(sparkSession)
+ val map = CarbonUtil.convertToMultiStringMap(tableInfo)
+ properties.foreach(e => map.put(e._1, e._2))
+ map.put("tablePath", tablePath)
+ // updating params
+ val updatedFormat = storageFormat.copy(properties = map.asScala.toMap)
+ tableDesc.copy(storage = updatedFormat)
+ } else {
+ tableDesc
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
index b08c113..0d5a821 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
@@ -63,16 +63,15 @@ private[sql] case class AlterTableAddColumns(
// completion of 1st operation but as look up relation is called before it will have the
// older carbon table and this can lead to inconsistent state in the system. Therefor look
// up relation should be called after acquiring the lock
- carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
+ val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
+ carbonTable = metastore
.lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
.tableMeta.carbonTable
// get the latest carbon table and check for column existence
// read the latest schema file
val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
carbonTable.getCarbonTableIdentifier)
- val tableMetadataFile = carbonTablePath.getSchemaFilePath
- val thriftTableInfo: TableInfo = CarbonEnv.getInstance(sparkSession).carbonMetastore
- .readSchemaFile(tableMetadataFile)
+ val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
val schemaConverter = new ThriftWrapperSchemaConverterImpl()
val wrapperTableInfo = schemaConverter
.fromExternalToWrapperTableInfo(thriftTableInfo,
@@ -104,8 +103,8 @@ private[sql] case class AlterTableAddColumns(
LOGGER.info(s"Alter table for add columns is successful for table $dbName.$tableName")
LOGGER.audit(s"Alter table for add columns is successful for table $dbName.$tableName")
} catch {
- case e: Exception => LOGGER
- .error("Alter table add columns failed :" + e.getMessage)
+ case e: Exception =>
+ LOGGER.error(e, "Alter table add columns failed")
if (newCols.nonEmpty) {
LOGGER.info("Cleaning up the dictionary files as alter table add operation failed")
new AlterTableDropColumnRDD(sparkSession.sparkContext,
@@ -147,9 +146,9 @@ private[sql] case class AlterTableRenameTable(alterTableRenameModel: AlterTableR
val newTableName = newTableIdentifier.table.toLowerCase
LOGGER.audit(s"Rename table request has been received for $oldDatabaseName.$oldTableName")
LOGGER.info(s"Rename table request has been received for $oldDatabaseName.$oldTableName")
+ val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
val relation: CarbonRelation =
- CarbonEnv.getInstance(sparkSession).carbonMetastore
- .lookupRelation(oldTableIdentifier.database, oldTableName)(sparkSession)
+ metastore.lookupRelation(oldTableIdentifier.database, oldTableName)(sparkSession)
.asInstanceOf[CarbonRelation]
if (relation == null) {
LOGGER.audit(s"Rename table request has failed. " +
@@ -168,15 +167,14 @@ private[sql] case class AlterTableRenameTable(alterTableRenameModel: AlterTableR
locks = AlterTableUtil
.validateTableAndAcquireLock(oldDatabaseName, oldTableName, locksToBeAcquired)(
sparkSession)
- carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
- .lookupRelation(Some(oldDatabaseName), oldTableName)(sparkSession)
+ carbonTable = metastore.lookupRelation(Some(oldDatabaseName), oldTableName)(sparkSession)
.asInstanceOf[CarbonRelation].tableMeta.carbonTable
// get the latest carbon table and check for column existence
val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
carbonTable.getCarbonTableIdentifier)
- val tableMetadataFile = carbonTablePath.getSchemaFilePath
- val tableInfo: org.apache.carbondata.format.TableInfo = CarbonEnv.getInstance(sparkSession)
- .carbonMetastore.readSchemaFile(tableMetadataFile)
+ val tableMetadataFile = carbonTablePath.getPath
+ val tableInfo: org.apache.carbondata.format.TableInfo =
+ metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
val schemaEvolutionEntry = new SchemaEvolutionEntry(System.currentTimeMillis)
schemaEvolutionEntry.setTableName(newTableName)
timeStamp = System.currentTimeMillis()
@@ -193,15 +191,13 @@ private[sql] case class AlterTableRenameTable(alterTableRenameModel: AlterTableR
}
}
val newTableIdentifier = new CarbonTableIdentifier(oldDatabaseName,
- newTableName,
- carbonTable.getCarbonTableIdentifier.getTableId)
- val newTablePath = CarbonEnv.getInstance(sparkSession).carbonMetastore
- .updateTableSchema(newTableIdentifier,
+ newTableName, carbonTable.getCarbonTableIdentifier.getTableId)
+ val newTablePath = metastore.updateTableSchema(newTableIdentifier,
+ carbonTable.getCarbonTableIdentifier,
tableInfo,
schemaEvolutionEntry,
carbonTable.getStorePath)(sparkSession)
- CarbonEnv.getInstance(sparkSession).carbonMetastore
- .removeTableFromMetadata(oldDatabaseName, oldTableName)
+ metastore.removeTableFromMetadata(oldDatabaseName, oldTableName)
sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive
.runSqlHive(
s"ALTER TABLE $oldDatabaseName.$oldTableName RENAME TO $oldDatabaseName.$newTableName")
@@ -213,8 +209,8 @@ private[sql] case class AlterTableRenameTable(alterTableRenameModel: AlterTableR
LOGGER.audit(s"Table $oldTableName has been successfully renamed to $newTableName")
LOGGER.info(s"Table $oldTableName has been successfully renamed to $newTableName")
} catch {
- case e: Exception => LOGGER
- .error("Rename table failed: " + e.getMessage)
+ case e: Exception =>
+ LOGGER.error(e, "Rename table failed: " + e.getMessage)
if (carbonTable != null) {
AlterTableUtil
.revertRenameTableChanges(oldTableIdentifier,
@@ -279,7 +275,8 @@ private[sql] case class AlterTableDropColumns(
try {
locks = AlterTableUtil
.validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired)(sparkSession)
- carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
+ val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
+ carbonTable = metastore
.lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
.tableMeta.carbonTable
val partitionInfo = carbonTable.getPartitionInfo(tableName)
@@ -329,9 +326,8 @@ private[sql] case class AlterTableDropColumns(
// read the latest schema file
val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
carbonTable.getCarbonTableIdentifier)
- val tableMetadataFile = carbonTablePath.getSchemaFilePath
- val tableInfo: org.apache.carbondata.format.TableInfo = CarbonEnv.getInstance(sparkSession)
- .carbonMetastore.readSchemaFile(tableMetadataFile)
+ val tableInfo: org.apache.carbondata.format.TableInfo =
+ metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
// maintain the deleted columns for schema evolution history
var deletedColumnSchema = ListBuffer[org.apache.carbondata.format.ColumnSchema]()
val columnSchemaList = tableInfo.fact_table.table_columns.asScala
@@ -393,7 +389,8 @@ private[sql] case class AlterTableDataTypeChange(
try {
locks = AlterTableUtil
.validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired)(sparkSession)
- carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
+ val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
+ carbonTable = metastore
.lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
.tableMeta.carbonTable
val columnName = alterTableDataTypeChangeModel.columnName
@@ -415,9 +412,7 @@ private[sql] case class AlterTableDataTypeChange(
// read the latest schema file
val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
carbonTable.getCarbonTableIdentifier)
- val tableMetadataFile = carbonTablePath.getSchemaFilePath
- val tableInfo: TableInfo = CarbonEnv.getInstance(sparkSession).carbonMetastore
- .readSchemaFile(tableMetadataFile)
+ val tableInfo: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
// maintain the added column for schema evolution history
var addColumnSchema: ColumnSchema = null
var deletedColumnSchema: ColumnSchema = null
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala
index d2022be..609f39b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala
@@ -18,9 +18,10 @@
package org.apache.spark.sql.hive.execution.command
import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.command.{CarbonDropTableCommand, DropDatabaseCommand, ResetCommand, RunnableCommand, SetCommand}
-import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
case class CarbonDropDatabaseCommand(command: DropDatabaseCommand)
extends RunnableCommand {
@@ -29,16 +30,19 @@ case class CarbonDropDatabaseCommand(command: DropDatabaseCommand)
override def run(sparkSession: SparkSession): Seq[Row] = {
val dbName = command.databaseName
+ var tablesInDB: Seq[TableIdentifier] = null
+ if (sparkSession.sessionState.catalog.listDatabases().exists(_.equalsIgnoreCase(dbName))) {
+ tablesInDB = sparkSession.sessionState.catalog.listTables(dbName)
+ }
// DropHiveDB command will fail if cascade is false and one or more table exists in database
val rows = command.run(sparkSession)
- if (command.cascade) {
- val tablesInDB = CarbonEnv.getInstance(sparkSession).carbonMetastore.getAllTables()
- .filter(_.database.exists(_.equalsIgnoreCase(dbName)))
+ if (command.cascade && tablesInDB != null) {
tablesInDB.foreach { tableName =>
CarbonDropTableCommand(true, tableName.database, tableName.table).run(sparkSession)
}
}
- CarbonEnv.getInstance(sparkSession).carbonMetastore.dropDatabaseDirectory(dbName.toLowerCase)
+ CarbonUtil.dropDatabaseDirectory(dbName.toLowerCase, CarbonEnv.getInstance(sparkSession)
+ .carbonMetastore.storePath)
rows
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
index 6087736..760cb06 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
@@ -16,13 +16,14 @@
*/
package org.apache.spark.sql.execution.command
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, InsertIntoCarbonTable, ShowLoadsCommand, SparkSession}
+import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy}
import org.apache.spark.sql.hive.execution.command.{CarbonDropDatabaseCommand, CarbonResetCommand, CarbonSetCommand}
+import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
/**
@@ -61,7 +62,8 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
_, child: LogicalPlan, _, _) =>
ExecutedCommandExec(LoadTableByInsert(relation, child)) :: Nil
case createDb@CreateDatabaseCommand(dbName, ifNotExists, _, _, _) =>
- CarbonEnv.getInstance(sparkSession).carbonMetastore.createDatabaseDirectory(dbName)
+ CarbonUtil.createDatabaseDirectory(dbName, CarbonEnv.getInstance(sparkSession).
+ carbonMetastore.storePath)
ExecutedCommandExec(createDb) :: Nil
case drop@DropDatabaseCommand(dbName, ifExists, isCascade) =>
ExecutedCommandExec(CarbonDropDatabaseCommand(drop)) :: Nil
@@ -127,6 +129,14 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
ExecutedCommandExec(CarbonSetCommand(set)) :: Nil
case reset@ResetCommand =>
ExecutedCommandExec(CarbonResetCommand()) :: Nil
+ case org.apache.spark.sql.execution.datasources.CreateTable(tableDesc, mode, None)
+ if tableDesc.provider.get != DDLUtils.HIVE_PROVIDER
+ && tableDesc.provider.get.equals("org.apache.spark.sql.CarbonSource") =>
+ val updatedCatalog =
+ CarbonSource.updateCatalogTableWithCarbonSchema(tableDesc, sparkSession)
+ val cmd =
+ CreateDataSourceTableCommand(updatedCatalog, ignoreIfExists = mode == SaveMode.Ignore)
+ ExecutedCommandExec(cmd) :: Nil
case _ => Nil
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
index 2fccd0c..2c1de52 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
@@ -42,6 +42,7 @@ import org.apache.carbondata.core.statusmanager.{SegmentStatusManager, SegmentUp
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
import org.apache.carbondata.core.writer.CarbonDeleteDeltaWriterImpl
+import org.apache.carbondata.hadoop.CarbonInputFormat
import org.apache.carbondata.processing.exception.MultipleMatchingException
import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CarbonDataMergerUtilResult, CompactionType}
import org.apache.carbondata.spark.DeleteDelataResultImpl
@@ -107,7 +108,7 @@ private[sql] case class ProjectForDeleteCommand(
CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, e.compactionTimeStamp.toString)
case e: Exception =>
- LOG.error("Exception in Delete data operation " + e.getMessage)
+ LOG.error(e, "Exception in Delete data operation " + e.getMessage)
// ****** start clean up.
// In case of failure , clean all related delete delta files
CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, timestamp)
@@ -548,7 +549,7 @@ object deleteExecution {
val (carbonInputFormat, job) =
QueryPlanUtil.createCarbonInputFormat(absoluteTableIdentifier)
-
+ CarbonInputFormat.setTableInfo(job.getConfiguration, carbonTable.getTableInfo)
val keyRdd = deleteRdd.map({ row =>
val tupleId: String = row
.getString(row.fieldIndex(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index ce66733..88e89ad 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -29,14 +29,13 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
-import org.apache.spark.sql.hive.{CarbonMetastore, CarbonRelation}
+import org.apache.spark.sql.hive.CarbonRelation
import org.apache.spark.util.FileUtils
import org.codehaus.jackson.map.ObjectMapper
import org.apache.carbondata.api.CarbonStore
import org.apache.carbondata.common.constants.LoggerAction
import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree
import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.dictionary.server.DictionaryServer
@@ -48,6 +47,7 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum}
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.format
import org.apache.carbondata.processing.constants.TableOptionConstant
import org.apache.carbondata.processing.etl.DataLoadingException
import org.apache.carbondata.processing.model.{CarbonDataLoadSchema, CarbonLoadModel}
@@ -186,23 +186,24 @@ case class CreateTable(cm: TableModel, createDSTable: Boolean = true) extends Ru
} else {
// Add Database to catalog and persist
val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
- // Need to fill partitioner class when we support partition
- val tablePath = catalog.createTableFromThrift(tableInfo, dbName, tbName)(sparkSession)
+ val (tablePath, carbonSchemaString) =
+ catalog.createTableFromThrift(tableInfo, dbName, tbName)(sparkSession)
if (createDSTable) {
try {
val fields = new Array[Field](cm.dimCols.size + cm.msrCols.size)
cm.dimCols.foreach(f => fields(f.schemaOrdinal) = f)
cm.msrCols.foreach(f => fields(f.schemaOrdinal) = f)
+
sparkSession.sql(
s"""CREATE TABLE $dbName.$tbName
|(${ fields.map(f => f.rawSchema).mkString(",") })
|USING org.apache.spark.sql.CarbonSource""".stripMargin +
- s""" OPTIONS (tableName "$tbName", dbName "$dbName", tablePath "$tablePath") """)
+ s""" OPTIONS (tableName "$tbName", dbName "$dbName", tablePath """.stripMargin +
+ s""""$tablePath" $carbonSchemaString) """)
} catch {
case e: Exception =>
val identifier: TableIdentifier = TableIdentifier(tbName, Some(dbName))
// call the drop table to delete the created table.
-
CarbonEnv.getInstance(sparkSession).carbonMetastore
.dropTable(catalog.storePath, identifier)(sparkSession)
@@ -234,9 +235,9 @@ case class DeleteLoadsById(
def run(sparkSession: SparkSession): Seq[Row] = {
Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
- val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
- .getTableFromMetadata(getDB.getDatabaseName(databaseNameOp, sparkSession), tableName)
- .map(_.carbonTable).getOrElse(null)
+ val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore.
+ lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation].
+ tableMeta.carbonTable
CarbonStore.deleteLoadById(
loadids,
getDB.getDatabaseName(databaseNameOp, sparkSession),
@@ -259,9 +260,9 @@ case class DeleteLoadsByLoadDate(
def run(sparkSession: SparkSession): Seq[Row] = {
Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
- val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
- .getTableFromMetadata(getDB.getDatabaseName(databaseNameOp, sparkSession), tableName)
- .map(_.carbonTable).getOrElse(null)
+ val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore.
+ lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation].
+ tableMeta.carbonTable
CarbonStore.deleteLoadByDate(
loadDate,
getDB.getDatabaseName(databaseNameOp, sparkSession),
@@ -279,13 +280,13 @@ object LoadTable {
sqlContext: SQLContext,
model: DictionaryLoadModel,
noDictDimension: Array[CarbonDimension]): Unit = {
-
+ val sparkSession = sqlContext.sparkSession
val carbonTablePath = CarbonStorePath.getCarbonTablePath(model.hdfsLocation,
model.table)
- val schemaFilePath = carbonTablePath.getSchemaFilePath
+ val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
// read TableInfo
- val tableInfo = CarbonMetastore.readSchemaFileToThriftTable(schemaFilePath)
+ val tableInfo: format.TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
// modify TableInfo
val columns = tableInfo.getFact_table.getTable_columns
@@ -294,23 +295,21 @@ object LoadTable {
columns.get(i).encoders.remove(org.apache.carbondata.format.Encoding.DICTIONARY)
}
}
+ val entry = tableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0)
+ entry.setTime_stamp(System.currentTimeMillis())
// write TableInfo
- CarbonMetastore.writeThriftTableToSchemaFile(schemaFilePath, tableInfo)
-
- val catalog = CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetastore
+ metastore.updateTableSchema(carbonTablePath.getCarbonTableIdentifier,
+ carbonTablePath.getCarbonTableIdentifier,
+ tableInfo, entry, carbonTablePath.getPath)(sparkSession)
// update the schema modified time
- catalog.updateSchemasUpdatedTime(catalog.touchSchemaFileSystemTime(
+ metastore.updateAndTouchSchemasUpdatedTime(
carbonLoadModel.getDatabaseName,
- carbonLoadModel.getTableName))
-
- // update Metadata
- catalog.updateMetadataByThriftTable(schemaFilePath, tableInfo,
- model.table.getDatabaseName, model.table.getTableName, carbonLoadModel.getStorePath)
+ carbonLoadModel.getTableName)
// update CarbonDataLoadSchema
- val carbonTable = catalog.lookupRelation(Option(model.table.getDatabaseName),
+ val carbonTable = metastore.lookupRelation(Option(model.table.getDatabaseName),
model.table.getTableName)(sqlContext.sparkSession).asInstanceOf[CarbonRelation].tableMeta
.carbonTable
carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable))
@@ -628,6 +627,14 @@ case class LoadTable(
LOGGER.audit(s"Cannot use single_pass=true for $dbName.$tableName during the first load")
carbonLoadModel.setUseOnePass(false)
}
+ // Create table and metadata folders if not exist
+ val carbonTablePath = CarbonStorePath
+ .getCarbonTablePath(storePath, table.getCarbonTableIdentifier)
+ val metadataDirectoryPath = carbonTablePath.getMetadataDirectoryPath
+ val fileType = FileFactory.getFileType(metadataDirectoryPath)
+ if (!FileFactory.isFileExist(metadataDirectoryPath, fileType)) {
+ FileFactory.mkdirs(metadataDirectoryPath, fileType)
+ }
if (carbonLoadModel.getUseOnePass) {
val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
val carbonTableIdentifier = carbonTable.getAbsoluteTableIdentifier
@@ -691,8 +698,7 @@ case class LoadTable(
server,
dataFrame,
updateModel)
- }
- else {
+ } else {
val (dictionaryDataFrame, loadDataFrame) = if (updateModel.isDefined) {
val fields = dataFrame.get.schema.fields
import org.apache.spark.sql.functions.udf
@@ -726,6 +732,7 @@ case class LoadTable(
} else {
(dataFrame, dataFrame)
}
+
GlobalDictionaryUtil
.generateGlobalDictionary(
sparkSession.sqlContext,
@@ -794,9 +801,9 @@ private[sql] case class DeleteLoadByDate(
def run(sparkSession: SparkSession): Seq[Row] = {
Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
- val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
- .getTableFromMetadata(getDB.getDatabaseName(databaseNameOp, sparkSession), tableName)
- .map(_.carbonTable).getOrElse(null)
+ val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore.
+ lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation].
+ tableMeta.carbonTable
CarbonStore.deleteLoadByDate(
loadDate,
getDB.getDatabaseName(databaseNameOp, sparkSession),
@@ -818,9 +825,7 @@ case class CleanFiles(
val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
val relation = catalog
.lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation]
- val carbonTable = catalog
- .getTableFromMetadata(getDB.getDatabaseName(databaseNameOp, sparkSession), tableName)
- .map(_.carbonTable).getOrElse(null)
+ val carbonTable = relation.tableMeta.carbonTable
CarbonStore.cleanFiles(
getDB.getDatabaseName(databaseNameOp, sparkSession),
tableName,
@@ -839,9 +844,9 @@ case class ShowLoads(
def run(sparkSession: SparkSession): Seq[Row] = {
Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
- val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
- .getTableFromMetadata(getDB.getDatabaseName(databaseNameOp, sparkSession), tableName)
- .map(_.carbonTable).getOrElse(null)
+ val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore.
+ lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation].
+ tableMeta.carbonTable
CarbonStore.showSegments(
getDB.getDatabaseName(databaseNameOp, sparkSession),
tableName,
@@ -867,15 +872,11 @@ case class CarbonDropTableCommand(ifExistsSet: Boolean,
catalog.checkSchemasModifiedTimeAndReloadTables()
val carbonLocks: scala.collection.mutable.ListBuffer[ICarbonLock] = ListBuffer()
try {
- val carbonTable = catalog.getTableFromMetadata(dbName, tableName).map(_.carbonTable).orNull
- locksToBeAcquired foreach {
- lock => carbonLocks += CarbonLockUtil.getLockObject(carbonTable, lock)
+ locksToBeAcquired foreach {
+ lock => carbonLocks += CarbonLockUtil.getLockObject(carbonTableIdentifier, lock)
}
LOGGER.audit(s"Deleting table [$tableName] under database [$dbName]")
- if (null != carbonTable) {
- // clear driver B-tree and dictionary cache
- ManageDictionaryAndBTree.clearBTreeAndDictionaryLRUCache(carbonTable)
- }
+
CarbonEnv.getInstance(sparkSession).carbonMetastore
.dropTable(storePath, identifier)(sparkSession)
LOGGER.audit(s"Deleted table [$tableName] under database [$dbName]")