You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2018/04/17 06:55:14 UTC
carbondata git commit: Blockletsize and Blocksize issue fix in sdk
writer and other unmanaged table fixes
Repository: carbondata
Updated Branches:
refs/heads/master 78e4d0da3 -> cf1e4d4ca
Blockletsize and Blocksize issue fix in sdk writer and other unmanaged table fixes
*Decimal dataype issue fix in sdk writer
*Drop unmanaged table issue in cluster
*Added comment for SDK writer API methods
*Query two writer's output at same path issue fix
*Block alter table rename for unmanged table
This closes #2141
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/cf1e4d4c
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/cf1e4d4c
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/cf1e4d4c
Branch: refs/heads/master
Commit: cf1e4d4ca7cef8bc49b4eb7b811af5c1bd787cba
Parents: 78e4d0d
Author: ajantha-bhat <aj...@gmail.com>
Authored: Fri Apr 6 14:10:38 2018 +0530
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Tue Apr 17 12:21:33 2018 +0530
----------------------------------------------------------------------
.../schema/table/TableSchemaBuilder.java | 26 ++++++-
.../executor/impl/AbstractQueryExecutor.java | 7 +-
.../scan/executor/util/RestructureUtil.java | 29 ++++++--
.../scan/executor/util/RestructureUtilTest.java | 4 +-
.../createTable/TestUnmanagedCarbonTable.scala | 77 +++++++++++++++-----
.../schema/CarbonAlterTableRenameCommand.scala | 8 ++
.../spark/sql/hive/CarbonFileMetastore.scala | 21 +-----
.../sdk/file/CarbonWriterBuilder.java | 51 +++++++++++--
8 files changed, 172 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cf1e4d4c/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
index 2dd5a9e..7c2e54d 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
@@ -26,8 +26,10 @@ import java.util.Objects;
import java.util.UUID;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.datatype.DecimalType;
import org.apache.carbondata.core.metadata.datatype.StructField;
import org.apache.carbondata.core.metadata.encoder.Encoding;
import org.apache.carbondata.core.metadata.schema.SchemaEvolution;
@@ -47,6 +49,8 @@ public class TableSchemaBuilder {
private int blockSize;
+ private int blockletSize;
+
private String tableName;
public TableSchemaBuilder blockSize(int blockSize) {
@@ -57,6 +61,14 @@ public class TableSchemaBuilder {
return this;
}
+ public TableSchemaBuilder blockletSize(int blockletSize) {
+ if (blockletSize <= 0) {
+ throw new IllegalArgumentException("blockletSize should be greater than 0");
+ }
+ this.blockletSize = blockletSize;
+ return this;
+ }
+
public TableSchemaBuilder tableName(String tableName) {
Objects.requireNonNull(tableName);
this.tableName = tableName;
@@ -76,11 +88,18 @@ public class TableSchemaBuilder {
allColumns.addAll(otherColumns);
schema.setListOfColumns(allColumns);
+ Map<String, String> property = new HashMap<>();
if (blockSize > 0) {
- Map<String, String> property = new HashMap<>();
property.put(CarbonCommonConstants.TABLE_BLOCKSIZE, String.valueOf(blockSize));
+ }
+ if (blockletSize > 0) {
+ property.put(CarbonV3DataFormatConstants.BLOCKLET_SIZE_IN_MB, String.valueOf(blockletSize));
+ }
+ // TODO: check other table properties
+ if (property.size() != 0) {
schema.setTableProperties(property);
}
+
return schema;
}
@@ -103,6 +122,11 @@ public class TableSchemaBuilder {
newColumn.setColumnUniqueId(UUID.randomUUID().toString());
newColumn.setColumnReferenceId(newColumn.getColumnUniqueId());
newColumn.setEncodingList(createEncoding(field.getDataType(), isSortColumn));
+ if (DataTypes.isDecimal(field.getDataType())) {
+ DecimalType decimalType = (DecimalType) field.getDataType();
+ newColumn.setPrecision(decimalType.getPrecision());
+ newColumn.setScale(decimalType.getScale());
+ }
if (isSortColumn) {
sortColumns.add(newColumn);
newColumn.setSortColumn(true);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cf1e4d4c/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
index f0f5bce..d2d458e 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
@@ -308,7 +308,8 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
List<ProjectionDimension> projectDimensions = RestructureUtil
.createDimensionInfoAndGetCurrentBlockQueryDimension(blockExecutionInfo,
queryModel.getProjectionDimensions(), tableBlockDimensions,
- segmentProperties.getComplexDimensions(), queryModel.getProjectionMeasures().size());
+ segmentProperties.getComplexDimensions(), queryModel.getProjectionMeasures().size(),
+ queryModel.getTable().getTableInfo().isUnManagedTable());
blockExecutionInfo.setBlockId(
CarbonUtil.getBlockId(queryModel.getAbsoluteTableIdentifier(), filePath, segmentId,
queryModel.getTable().getTableInfo().isUnManagedTable()));
@@ -517,7 +518,9 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
// getting the measure info which will be used while filling up measure data
List<ProjectionMeasure> updatedQueryMeasures = RestructureUtil
.createMeasureInfoAndGetCurrentBlockQueryMeasures(executionInfo,
- queryModel.getProjectionMeasures(), tableBlock.getSegmentProperties().getMeasures());
+ queryModel.getProjectionMeasures(),
+ tableBlock.getSegmentProperties().getMeasures(),
+ queryModel.getTable().getTableInfo().isUnManagedTable());
// setting the measure aggregator for all aggregation function selected
// in query
executionInfo.getMeasureInfo().setMeasureDataTypes(queryProperties.measureDataTypes);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cf1e4d4c/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
index 9c1f2f8..3b477ab 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
@@ -28,6 +28,7 @@ import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionary
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
@@ -57,12 +58,13 @@ public class RestructureUtil {
* @param queryDimensions
* @param tableBlockDimensions
* @param tableComplexDimension
+ * @param isUnManagedTable
* @return list of query dimension which is present in the table block
*/
public static List<ProjectionDimension> createDimensionInfoAndGetCurrentBlockQueryDimension(
BlockExecutionInfo blockExecutionInfo, List<ProjectionDimension> queryDimensions,
List<CarbonDimension> tableBlockDimensions, List<CarbonDimension> tableComplexDimension,
- int measureCount) {
+ int measureCount, boolean isUnManagedTable) {
List<ProjectionDimension> presentDimension =
new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
boolean[] isDimensionExists = new boolean[queryDimensions.size()];
@@ -82,7 +84,7 @@ public class RestructureUtil {
queryDimension.getDimension().getDataType();
} else {
for (CarbonDimension tableDimension : tableBlockDimensions) {
- if (tableDimension.getColumnId().equals(queryDimension.getDimension().getColumnId())) {
+ if (isColumnMatches(isUnManagedTable, queryDimension.getDimension(), tableDimension)) {
ProjectionDimension currentBlockDimension = new ProjectionDimension(tableDimension);
tableDimension.getColumnSchema()
.setPrecision(queryDimension.getDimension().getColumnSchema().getPrecision());
@@ -104,7 +106,7 @@ public class RestructureUtil {
continue;
}
for (CarbonDimension tableDimension : tableComplexDimension) {
- if (tableDimension.getColumnId().equals(queryDimension.getDimension().getColumnId())) {
+ if (isColumnMatches(isUnManagedTable, queryDimension.getDimension(), tableDimension)) {
ProjectionDimension currentBlockDimension = new ProjectionDimension(tableDimension);
// TODO: for complex dimension set scale and precision by traversing
// the child dimensions
@@ -141,6 +143,22 @@ public class RestructureUtil {
}
/**
+ * Match the columns for managed and unmanaged tables
+ * @param isUnManagedTable
+ * @param queryColumn
+ * @param tableColumn
+ * @return
+ */
+ private static boolean isColumnMatches(boolean isUnManagedTable,
+ CarbonColumn queryColumn, CarbonColumn tableColumn) {
+ // If it is unmanaged table just check the column names, no need to validate column id as
+ // multiple sdk's output placed in a single folder doesn't have same column ID but can
+ // have same column name
+ return (tableColumn.getColumnId().equals(queryColumn.getColumnId()) ||
+ (isUnManagedTable && tableColumn.getColName().equals(queryColumn.getColName())));
+ }
+
+ /**
* This method will validate and return the default value to be
* filled at the time of result preparation
*
@@ -337,11 +355,12 @@ public class RestructureUtil {
* @param blockExecutionInfo
* @param queryMeasures measures present in query
* @param currentBlockMeasures current block measures
+ * @param isUnManagedTable
* @return measures present in the block
*/
public static List<ProjectionMeasure> createMeasureInfoAndGetCurrentBlockQueryMeasures(
BlockExecutionInfo blockExecutionInfo, List<ProjectionMeasure> queryMeasures,
- List<CarbonMeasure> currentBlockMeasures) {
+ List<CarbonMeasure> currentBlockMeasures, boolean isUnManagedTable) {
MeasureInfo measureInfo = new MeasureInfo();
List<ProjectionMeasure> presentMeasure = new ArrayList<>(queryMeasures.size());
int numberOfMeasureInQuery = queryMeasures.size();
@@ -354,7 +373,7 @@ public class RestructureUtil {
// then setting measure exists is true
// otherwise adding a default value of a measure
for (CarbonMeasure carbonMeasure : currentBlockMeasures) {
- if (carbonMeasure.getColumnId().equals(queryMeasure.getMeasure().getColumnId())) {
+ if (isColumnMatches(isUnManagedTable, carbonMeasure, queryMeasure.getMeasure())) {
ProjectionMeasure currentBlockMeasure = new ProjectionMeasure(carbonMeasure);
carbonMeasure.getColumnSchema().setDataType(queryMeasure.getMeasure().getDataType());
carbonMeasure.getColumnSchema().setPrecision(queryMeasure.getMeasure().getPrecision());
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cf1e4d4c/core/src/test/java/org/apache/carbondata/core/scan/executor/util/RestructureUtilTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/scan/executor/util/RestructureUtilTest.java b/core/src/test/java/org/apache/carbondata/core/scan/executor/util/RestructureUtilTest.java
index 163580d..cb80cd3 100644
--- a/core/src/test/java/org/apache/carbondata/core/scan/executor/util/RestructureUtilTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/scan/executor/util/RestructureUtilTest.java
@@ -92,7 +92,7 @@ public class RestructureUtilTest {
List<ProjectionDimension> result = null;
result = RestructureUtil
.createDimensionInfoAndGetCurrentBlockQueryDimension(blockExecutionInfo, queryDimensions,
- tableBlockDimensions, tableComplexDimensions, queryMeasures.size());
+ tableBlockDimensions, tableComplexDimensions, queryMeasures.size(), false);
List<CarbonDimension> resultDimension = new ArrayList<>(result.size());
for (ProjectionDimension queryDimension : result) {
resultDimension.add(queryDimension.getDimension());
@@ -127,7 +127,7 @@ public class RestructureUtilTest {
List<ProjectionMeasure> queryMeasures = Arrays.asList(queryMeasure1, queryMeasure2, queryMeasure3);
BlockExecutionInfo blockExecutionInfo = new BlockExecutionInfo();
RestructureUtil.createMeasureInfoAndGetCurrentBlockQueryMeasures(blockExecutionInfo, queryMeasures,
- currentBlockMeasures);
+ currentBlockMeasures, false);
MeasureInfo measureInfo = blockExecutionInfo.getMeasureInfo();
boolean[] measuresExist = { true, true, false };
assertThat(measureInfo.getMeasureExists(), is(equalTo(measuresExist)));
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cf1e4d4c/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestUnmanagedCarbonTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestUnmanagedCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestUnmanagedCarbonTable.scala
index bfb9471..a6ee807 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestUnmanagedCarbonTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestUnmanagedCarbonTable.scala
@@ -43,11 +43,24 @@ class TestUnmanagedCarbonTable extends QueryTest with BeforeAndAfterAll {
//getCanonicalPath gives path with \, so code expects /. Need to handle in code ?
writerPath = writerPath.replace("\\", "/");
- // prepare SDK writer output
- def buildTestData(persistSchema: Boolean, outputMultipleFiles: Boolean): Any = {
+ def buildTestDataSingleFile(): Any = {
+ FileUtils.deleteDirectory(new File(writerPath))
+ buildTestData(3,false)
+ }
+
+ def buildTestDataMultipleFiles(): Any = {
+ FileUtils.deleteDirectory(new File(writerPath))
+ buildTestData(1000000,false)
+ }
+ def buildTestDataTwice(): Any = {
FileUtils.deleteDirectory(new File(writerPath))
+ buildTestData(3,false)
+ buildTestData(3,false)
+ }
+ // prepare sdk writer output
+ def buildTestData(rows:Int, persistSchema:Boolean): Any = {
val schema = new StringBuilder()
.append("[ \n")
.append(" {\"name\":\"string\"},\n")
@@ -68,15 +81,11 @@ class TestUnmanagedCarbonTable extends QueryTest with BeforeAndAfterAll {
} else {
builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath).unManagedTable(true)
.uniqueIdentifier(
- System.currentTimeMillis).withBlockSize(1).withBlockletSize(1)
+ System.currentTimeMillis).withBlockSize(2)
.buildWriterForCSVInput()
}
var i = 0
- var row = 3
- if (outputMultipleFiles) {
- row = 1000000
- }
- while (i < row) {
+ while (i < rows) {
writer.write(Array[String]("robot" + i, String.valueOf(i), String.valueOf(i.toDouble / 2)))
i += 1
}
@@ -116,7 +125,7 @@ class TestUnmanagedCarbonTable extends QueryTest with BeforeAndAfterAll {
test("test create External Table with Schema with partition, should ignore schema and partition")
{
- buildTestData(false, false)
+ buildTestDataSingleFile()
assert(new File(writerPath).exists())
sql("DROP TABLE IF EXISTS sdkOutputTable")
@@ -137,7 +146,7 @@ class TestUnmanagedCarbonTable extends QueryTest with BeforeAndAfterAll {
}
test("read unmanaged table, files written from sdk Writer Output)") {
- buildTestData(false, false)
+ buildTestDataSingleFile()
assert(new File(writerPath).exists())
sql("DROP TABLE IF EXISTS sdkOutputTable1")
@@ -178,7 +187,7 @@ class TestUnmanagedCarbonTable extends QueryTest with BeforeAndAfterAll {
}
test("Test Blocked operations for unmanaged table ") {
- buildTestData(false, false)
+ buildTestDataSingleFile()
assert(new File(writerPath).exists())
sql("DROP TABLE IF EXISTS sdkOutputTable")
@@ -244,14 +253,14 @@ class TestUnmanagedCarbonTable extends QueryTest with BeforeAndAfterAll {
assert(exception.getMessage()
.contains("Unsupported operation on unmanaged table"))
- //9. Update Segment
+ //9. Update column
exception = intercept[MalformedCarbonCommandException] {
sql("UPDATE sdkOutputTable SET (age) = (age + 9) ").show(false)
}
assert(exception.getMessage()
.contains("Unsupported operation on unmanaged table"))
- //10. Delete Segment
+ //10. Delete column
exception = intercept[MalformedCarbonCommandException] {
sql("DELETE FROM sdkOutputTable where name='robot1'").show(false)
}
@@ -266,16 +275,23 @@ class TestUnmanagedCarbonTable extends QueryTest with BeforeAndAfterAll {
.contains("Unsupported operation on unmanaged table"))
//12. Streaming table creation
- // External table don't accept table properties
+ // No need as External table don't accept table properties
+
+ //13. Alter table rename command
+ exception = intercept[MalformedCarbonCommandException] {
+ sql("ALTER TABLE sdkOutputTable RENAME to newTable")
+ }
+ assert(exception.getMessage()
+ .contains("Unsupported operation on unmanaged table"))
sql("DROP TABLE sdkOutputTable")
- // drop table should not delete the files
+ //drop table should not delete the files
assert(new File(writerPath).exists())
cleanTestData()
}
test("test create External Table With Schema, should ignore the schema provided") {
- buildTestData(false, false)
+ buildTestDataSingleFile()
assert(new File(writerPath).exists())
sql("DROP TABLE IF EXISTS sdkOutputTable")
@@ -296,7 +312,7 @@ class TestUnmanagedCarbonTable extends QueryTest with BeforeAndAfterAll {
}
test("Read sdk writer output file without Carbondata file should fail") {
- buildTestData(false, false)
+ buildTestDataSingleFile()
deleteFile(writerPath, CarbonCommonConstants.FACT_FILE_EXT)
assert(new File(writerPath).exists())
sql("DROP TABLE IF EXISTS sdkOutputTable")
@@ -317,7 +333,7 @@ class TestUnmanagedCarbonTable extends QueryTest with BeforeAndAfterAll {
test("Read sdk writer output file without any file should fail") {
- buildTestData(false, false)
+ buildTestDataSingleFile()
deleteFile(writerPath, CarbonCommonConstants.FACT_FILE_EXT)
deleteFile(writerPath, CarbonCommonConstants.UPDATE_INDEX_FILE_EXT)
assert(new File(writerPath).exists())
@@ -340,7 +356,7 @@ class TestUnmanagedCarbonTable extends QueryTest with BeforeAndAfterAll {
}
test("Read sdk writer output multiple files ") {
- buildTestData(false, true)
+ buildTestDataMultipleFiles()
assert(new File(writerPath).exists())
val folder = new File(writerPath)
val dataFiles = folder.listFiles(new FileFilter() {
@@ -365,5 +381,28 @@ class TestUnmanagedCarbonTable extends QueryTest with BeforeAndAfterAll {
cleanTestData()
}
+ test("Read two sdk writer outputs with same column name placed in same folder") {
+ buildTestDataTwice()
+ assert(new File(writerPath).exists())
+
+ sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+ sql(
+ s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
+ |'$writerPath' """.stripMargin)
+
+
+ checkAnswer(sql("select * from sdkOutputTable"), Seq(Row("robot0", 0, 0.0),
+ Row("robot1", 1, 0.5),
+ Row("robot2", 2, 1.0),
+ Row("robot0", 0, 0.0),
+ Row("robot1", 1, 0.5),
+ Row("robot2", 2, 1.0)))
+
+ // drop table should not delete the files
+ assert(new File(writerPath).exists())
+ cleanTestData()
+ }
+
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cf1e4d4c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
index 05c0059..e349e93 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
@@ -73,6 +73,14 @@ private[sql] case class CarbonAlterTableRenameCommand(
s"Table $oldDatabaseName.$oldTableName does not exist")
throwMetadataException(oldDatabaseName, oldTableName, "Table does not exist")
}
+
+ var oldCarbonTable: CarbonTable = null
+ oldCarbonTable = metastore.lookupRelation(Some(oldDatabaseName), oldTableName)(sparkSession)
+ .asInstanceOf[CarbonRelation].carbonTable
+ if (oldCarbonTable.getTableInfo.isUnManagedTable) {
+ throw new MalformedCarbonCommandException("Unsupported operation on unmanaged table")
+ }
+
val locksToBeAcquired = List(LockUsage.METADATA_LOCK,
LockUsage.COMPACTION_LOCK,
LockUsage.DELETE_SEGMENT_LOCK,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cf1e4d4c/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index 0075c13..36b6d96 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -474,7 +474,7 @@ class CarbonFileMetastore extends CarbonMetaStore {
sparkSession.sessionState.catalog.refreshTable(tableIdentifier)
DataMapStoreManager.getInstance().clearDataMaps(absoluteTableIdentifier)
} else {
- if (isUnmanagedCarbonTable(absoluteTableIdentifier, sparkSession)) {
+ if (isUnmanagedCarbonTable(absoluteTableIdentifier)) {
removeTableFromMetadata(dbName, tableName)
CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, sparkSession)
// discard cached table info in cachedDataSourceTables
@@ -486,22 +486,9 @@ class CarbonFileMetastore extends CarbonMetaStore {
}
- def isUnmanagedCarbonTable(identifier: AbsoluteTableIdentifier,
- sparkSession: SparkSession): Boolean = {
- if (sparkSession.sessionState.catalog.listTables(identifier.getDatabaseName)
- .exists(_.table.equalsIgnoreCase(identifier.getTableName))) {
-
- val table = sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog]
- .getCarbonEnv().carbonMetastore
- .getTableFromMetadataCache(identifier.getDatabaseName, identifier.getTableName)
-
- table match {
- case null => false
- case _ => table.get.getTableInfo.isUnManagedTable
- }
- } else {
- false
- }
+ def isUnmanagedCarbonTable(identifier: AbsoluteTableIdentifier): Boolean = {
+ val table = getTableFromMetadataCache(identifier.getDatabaseName, identifier.getTableName);
+ table.map(_.getTableInfo.isUnManagedTable).getOrElse(false)
}
private def getTimestampFileAndType() = {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cf1e4d4c/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
index f70e165..4e09553 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
@@ -58,44 +58,76 @@ public class CarbonWriterBuilder {
private boolean isUnManagedTable;
private long UUID;
+ /**
+ * prepares the builder with the schema provided
+ * @param schema is instance of Schema
+ * @return updated CarbonWriterBuilder
+ */
public CarbonWriterBuilder withSchema(Schema schema) {
Objects.requireNonNull(schema, "schema should not be null");
this.schema = schema;
return this;
}
+ /**
+ * Sets the output path of the writer builder
+ * @param path is the absolute path where output files are written
+ * @return updated CarbonWriterBuilder
+ */
public CarbonWriterBuilder outputPath(String path) {
Objects.requireNonNull(path, "path should not be null");
this.path = path;
return this;
}
+ /**
+ * sets the list of columns that needs to be in sorted order
+ * @param sortColumns is a string array of columns that needs to be sorted
+ * @return updated CarbonWriterBuilder
+ */
public CarbonWriterBuilder sortBy(String[] sortColumns) {
this.sortColumns = sortColumns;
return this;
}
- public CarbonWriterBuilder partitionBy(String[] partitionColumns) {
- throw new UnsupportedOperationException();
- }
-
+ /**
+ * If set, create a schema file in metadata folder.
+ * @param persist is a boolean value, If set, create a schema file in metadata folder
+ * @return updated CarbonWriterBuilder
+ */
public CarbonWriterBuilder persistSchemaFile(boolean persist) {
this.persistSchemaFile = persist;
return this;
}
+ /**
+ * If set true, writes the carbondata and carbonindex files in a flat folder structure
+ * @param isUnManagedTable is a boolelan value if set writes
+ * the carbondata and carbonindex files in a flat folder structure
+ * @return updated CarbonWriterBuilder
+ */
public CarbonWriterBuilder unManagedTable(boolean isUnManagedTable) {
Objects.requireNonNull(isUnManagedTable, "UnManaged Table should not be null");
this.isUnManagedTable = isUnManagedTable;
return this;
}
+ /**
+ * to set the timestamp in the carbondata and carbonindex index files
+ * @param UUID is a timestamp to be used in the carbondata and carbonindex index files
+ * @return updated CarbonWriterBuilder
+ */
public CarbonWriterBuilder uniqueIdentifier(long UUID) {
Objects.requireNonNull(UUID, "Unique Identifier should not be null");
this.UUID = UUID;
return this;
}
+ /**
+ * To set the carbondata file size in MB between 1MB-2048MB
+ * @param blockSize is size in MB between 1MB to 2048 MB
+ * @return updated CarbonWriterBuilder
+ */
public CarbonWriterBuilder withBlockSize(int blockSize) {
if (blockSize <= 0 || blockSize > 2048) {
throw new IllegalArgumentException("blockSize should be between 1 MB to 2048 MB");
@@ -104,6 +136,11 @@ public class CarbonWriterBuilder {
return this;
}
+ /**
+ * To set the blocklet size of carbondata file
+ * @param blockletSize is blocklet size in MB
+ * @return updated CarbonWriterBuilder
+ */
public CarbonWriterBuilder withBlockletSize(int blockletSize) {
if (blockletSize <= 0) {
throw new IllegalArgumentException("blockletSize should be greater than zero");
@@ -151,10 +188,14 @@ public class CarbonWriterBuilder {
*/
private CarbonTable buildCarbonTable() {
TableSchemaBuilder tableSchemaBuilder = TableSchema.builder();
- if (blockletSize > 0) {
+ if (blockSize > 0) {
tableSchemaBuilder = tableSchemaBuilder.blockSize(blockSize);
}
+ if (blockletSize > 0) {
+ tableSchemaBuilder = tableSchemaBuilder.blockletSize(blockletSize);
+ }
+
List<String> sortColumnsList;
if (sortColumns != null) {
sortColumnsList = Arrays.asList(sortColumns);