You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2018/04/22 04:15:36 UTC
[2/2] carbondata git commit: [CARBONDATA-2360][Non Transactional
Table] Insert into Non-Transactional Table
[CARBONDATA-2360][Non Transactional Table] Insert into Non-Transactional Table
Also supports overwrite clause
This closes #2177
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/b7b8073d
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/b7b8073d
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/b7b8073d
Branch: refs/heads/master
Commit: b7b8073d6ae729a4f9b73376ad5fc111c66efe3d
Parents: b86ff92
Author: sounakr <so...@gmail.com>
Authored: Tue Apr 17 13:38:51 2018 +0530
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Sun Apr 22 09:43:02 2018 +0530
----------------------------------------------------------------------
.../core/metadata/schema/SchemaReader.java | 1 +
.../core/metadata/schema/table/CarbonTable.java | 22 +-
.../schema/table/CarbonTableBuilder.java | 13 +-
.../core/metadata/schema/table/TableInfo.java | 26 +-
.../LatestFilesReadCommittedScope.java | 2 +-
.../executor/impl/AbstractQueryExecutor.java | 6 +-
.../scan/executor/util/RestructureUtil.java | 30 +-
.../SegmentUpdateStatusManager.java | 2 +-
.../apache/carbondata/core/util/CarbonUtil.java | 5 +-
.../hadoop/api/CarbonInputFormat.java | 8 +-
.../hadoop/api/CarbonOutputCommitter.java | 10 +-
.../hadoop/api/CarbonTableInputFormat.java | 10 +-
.../hadoop/api/CarbonTableOutputFormat.java | 3 +
.../hadoop/testutil/StoreCreator.java | 1 +
.../hadoop/util/CarbonInputFormatUtil.java | 3 +-
.../presto/util/CarbonDataStoreCreator.scala | 1 +
.../src/test/resources/nontransactional.csv | 3 +
...FileInputFormatWithExternalCarbonTable.scala | 2 +-
...tCreateTableUsingSparkCarbonFileFormat.scala | 2 +-
.../TestNonTransactionalCarbonTable.scala | 516 +++++++++++++++++++
...tSparkCarbonFileFormatWithSparkSession.scala | 2 +-
.../createTable/TestUnmanagedCarbonTable.scala | 408 ---------------
.../carbondata/spark/rdd/CarbonScanRDD.scala | 2 +-
.../spark/rdd/CarbonDataRDDFactory.scala | 24 +-
.../org/apache/spark/sql/CarbonCountStar.scala | 3 +-
.../org/apache/spark/sql/CarbonSource.scala | 14 +-
.../datamap/CarbonCreateDataMapCommand.scala | 4 +-
.../CarbonAlterTableCompactionCommand.scala | 5 +-
.../CarbonDeleteLoadByIdCommand.scala | 4 +-
.../CarbonDeleteLoadByLoadDateCommand.scala | 4 +-
.../management/CarbonLoadDataCommand.scala | 15 +-
.../management/CarbonShowLoadsCommand.scala | 4 +-
.../CarbonProjectForDeleteCommand.scala | 4 +-
.../CarbonProjectForUpdateCommand.scala | 4 +-
...arbonAlterTableAddHivePartitionCommand.scala | 1 +
.../CarbonAlterTableDropPartitionCommand.scala | 1 +
.../CarbonAlterTableSplitPartitionCommand.scala | 1 +
.../schema/CarbonAlterTableRenameCommand.scala | 4 +-
.../table/CarbonCreateTableCommand.scala | 4 +-
.../command/table/CarbonDropTableCommand.scala | 11 +-
.../sql/execution/strategy/DDLStrategy.scala | 25 +-
.../spark/sql/hive/CarbonFileMetastore.scala | 12 +-
.../spark/sql/parser/CarbonSparkSqlParser.scala | 6 +-
.../util/ExternalColumnDictionaryTestCase.scala | 1 +
.../loading/CarbonDataLoadConfiguration.java | 10 +-
.../loading/DataLoadProcessBuilder.java | 2 +-
.../loading/model/CarbonLoadModel.java | 18 +-
.../loading/model/CarbonLoadModelBuilder.java | 9 +-
.../processing/loading/model/LoadOption.java | 2 +-
.../store/CarbonFactDataHandlerModel.java | 2 +-
.../processing/util/CarbonLoaderUtil.java | 45 +-
.../carbondata/processing/StoreCreator.java | 1 +
.../sdk/file/CarbonWriterBuilder.java | 36 +-
.../sdk/file/AvroCarbonWriterTest.java | 1 +
.../sdk/file/CSVCarbonWriterTest.java | 1 +
.../CSVNonTransactionalCarbonWriterTest.java | 278 ++++++++++
.../sdk/file/CSVUnManagedCarbonWriterTest.java | 277 ----------
.../apache/carbondata/sdk/file/TestUtil.java | 1 +
58 files changed, 1066 insertions(+), 846 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaReader.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaReader.java
index 8692f13..be3906b 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaReader.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaReader.java
@@ -91,6 +91,7 @@ public class SchemaReader {
TableInfo wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(
tableInfo, identifier.getDatabaseName(), identifier.getTableName(),
identifier.getTablePath());
+ wrapperTableInfo.setTransactionalTable(false);
return wrapperTableInfo;
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index 88e00f3..f0ab857 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -142,12 +142,16 @@ public class CarbonTable implements Serializable {
private boolean hasDataMapSchema;
/**
- * The boolean field which points if the data written for UnManaged Table
- * or Managed Table. The difference between managed and unManaged table is
- * unManaged Table will not contain any Metadata folder and subsequently
+ * The boolean field which points if the data written for Non Transactional Table
+ * or Transactional Table.
+ * transactional table means carbon will provide transactional support when user doing data
+ * management like data loading, whether it is success or failure, data will be in consistent
+ * state
+ * The difference between Transactional and non Transactional table is
+ * non Transactional Table will not contain any Metadata folder and subsequently
* no TableStatus or Schema files.
*/
- private boolean isUnManagedTable;
+ private boolean isTransactionalTable = true;
private CarbonTable() {
this.tableDimensionsMap = new HashMap<String, List<CarbonDimension>>();
@@ -247,7 +251,7 @@ public class CarbonTable implements Serializable {
table.blockSize = tableInfo.getTableBlockSizeInMB();
table.tableLastUpdatedTime = tableInfo.getLastUpdatedTime();
table.tableUniqueName = tableInfo.getTableUniqueName();
- table.setUnManagedTable(tableInfo.isUnManagedTable());
+ table.setTransactionalTable(tableInfo.isTransactionalTable());
table.fillDimensionsAndMeasuresForTables(tableInfo.getFactTable());
table.fillCreateOrderColumn(tableInfo.getFactTable().getTableName());
if (tableInfo.getFactTable().getBucketingInfo() != null) {
@@ -939,11 +943,11 @@ public class CarbonTable implements Serializable {
return new CarbonTableBuilder();
}
- public boolean isUnManagedTable() {
- return isUnManagedTable;
+ public boolean isTransactionalTable() {
+ return isTransactionalTable;
}
- public void setUnManagedTable(boolean unManagedTable) {
- isUnManagedTable = unManagedTable;
+ public void setTransactionalTable(boolean transactionalTable) {
+ isTransactionalTable = transactionalTable;
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableBuilder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableBuilder.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableBuilder.java
index 82d0246..e1d2162 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableBuilder.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableBuilder.java
@@ -28,7 +28,7 @@ public class CarbonTableBuilder {
private String tableName;
private String databaseName;
private String tablePath;
- private boolean unManagedTable;
+ private boolean isTransactionalTable;
private TableSchema tableSchema;
public CarbonTableBuilder tableName(String tableName) {
@@ -47,10 +47,9 @@ public class CarbonTableBuilder {
return this;
}
-
- public CarbonTableBuilder isUnManagedTable(boolean isUnManagedTable) {
- Objects.requireNonNull(isUnManagedTable, "UnManaged Table should not be null");
- this.unManagedTable = isUnManagedTable;
+ public CarbonTableBuilder isTransactionalTable(boolean isTransactionalTable) {
+ Objects.requireNonNull(isTransactionalTable, "Transactional Table should not be null");
+ this.isTransactionalTable = isTransactionalTable;
return this;
}
@@ -63,7 +62,7 @@ public class CarbonTableBuilder {
public CarbonTable build() {
Objects.requireNonNull(tablePath, "tablePath should not be null");
Objects.requireNonNull(tableSchema, "tableSchema should not be null");
- Objects.requireNonNull(unManagedTable, "UnManaged Table should not be null");
+ Objects.requireNonNull(isTransactionalTable, "Transactional Table should not be null");
TableInfo tableInfo = new TableInfo();
@@ -71,7 +70,7 @@ public class CarbonTableBuilder {
tableInfo.setTableUniqueName(databaseName + "_" + tableName);
tableInfo.setFactTable(tableSchema);
tableInfo.setTablePath(tablePath);
- tableInfo.setUnManagedTable(unManagedTable);
+ tableInfo.setTransactionalTable(isTransactionalTable);
tableInfo.setLastUpdatedTime(System.currentTimeMillis());
tableInfo.setDataMapSchemaList(new ArrayList<DataMapSchema>(0));
return CarbonTable.buildFromTableInfo(tableInfo);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
index 3e7ea62..c8ac15a 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
@@ -78,12 +78,17 @@ public class TableInfo implements Serializable, Writable {
private String tablePath;
/**
- * The boolean field which points if the data written for UnManaged Table
- * or Managed Table. The difference between managed and unManaged table is
- * unManaged Table will not contain any Metadata folder and subsequently
+ * The boolean field which points if the data written for Non Transactional Table
+ * or Transactional Table. The difference between Transactional and Non Transactional table is
+ * Non Transactional Table will not contain any Metadata folder and subsequently
* no TableStatus or Schema files.
+ * All ACID properties cannot be aplied to Non Transactional Table as there is no Commit points
+ * i.e. no TableStatus File.
+ * What ever files present in the path will be read but it system doesnot ensure ACID rules for
+ * this data, mostly Consistency part.
+ *
*/
- private boolean isUnManagedTable;
+ private boolean isTransactionalTable = true;
// this identifier is a lazy field which will be created when it is used first time
private AbsoluteTableIdentifier identifier;
@@ -94,6 +99,7 @@ public class TableInfo implements Serializable, Writable {
public TableInfo() {
dataMapSchemaList = new ArrayList<>();
+ isTransactionalTable = true;
}
/**
@@ -248,7 +254,7 @@ public class TableInfo implements Serializable, Writable {
factTable.write(out);
out.writeLong(lastUpdatedTime);
out.writeUTF(getOrCreateAbsoluteTableIdentifier().getTablePath());
- out.writeBoolean(isUnManagedTable);
+ out.writeBoolean(isTransactionalTable);
boolean isChildSchemaExists =
null != dataMapSchemaList && dataMapSchemaList.size() > 0;
out.writeBoolean(isChildSchemaExists);
@@ -276,7 +282,7 @@ public class TableInfo implements Serializable, Writable {
this.factTable.readFields(in);
this.lastUpdatedTime = in.readLong();
this.tablePath = in.readUTF();
- this.isUnManagedTable = in.readBoolean();
+ this.isTransactionalTable = in.readBoolean();
boolean isChildSchemaExists = in.readBoolean();
this.dataMapSchemaList = new ArrayList<>();
if (isChildSchemaExists) {
@@ -330,11 +336,11 @@ public class TableInfo implements Serializable, Writable {
return parentRelationIdentifiers;
}
- public boolean isUnManagedTable() {
- return isUnManagedTable;
+ public boolean isTransactionalTable() {
+ return isTransactionalTable;
}
- public void setUnManagedTable(boolean unManagedTable) {
- isUnManagedTable = unManagedTable;
+ public void setTransactionalTable(boolean transactionalTable) {
+ isTransactionalTable = transactionalTable;
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java b/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
index 6afa280..3f870b8 100644
--- a/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
+++ b/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
@@ -33,7 +33,7 @@ import org.apache.carbondata.core.statusmanager.SegmentStatus;
import org.apache.carbondata.core.util.path.CarbonTablePath;
/**
- * This is a readCommittedScope for unmanaged carbon table
+ * This is a readCommittedScope for non transactional carbon table
*/
@InterfaceAudience.Internal
@InterfaceStability.Stable
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
index d2d458e..bc410ce 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
@@ -309,10 +309,10 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
.createDimensionInfoAndGetCurrentBlockQueryDimension(blockExecutionInfo,
queryModel.getProjectionDimensions(), tableBlockDimensions,
segmentProperties.getComplexDimensions(), queryModel.getProjectionMeasures().size(),
- queryModel.getTable().getTableInfo().isUnManagedTable());
+ queryModel.getTable().getTableInfo().isTransactionalTable());
blockExecutionInfo.setBlockId(
CarbonUtil.getBlockId(queryModel.getAbsoluteTableIdentifier(), filePath, segmentId,
- queryModel.getTable().getTableInfo().isUnManagedTable()));
+ queryModel.getTable().getTableInfo().isTransactionalTable()));
blockExecutionInfo.setDeleteDeltaFilePath(deleteDeltaFiles);
blockExecutionInfo.setStartBlockletIndex(startBlockletIndex);
blockExecutionInfo.setNumberOfBlockletToScan(numberOfBlockletToScan);
@@ -520,7 +520,7 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
.createMeasureInfoAndGetCurrentBlockQueryMeasures(executionInfo,
queryModel.getProjectionMeasures(),
tableBlock.getSegmentProperties().getMeasures(),
- queryModel.getTable().getTableInfo().isUnManagedTable());
+ queryModel.getTable().getTableInfo().isTransactionalTable());
// setting the measure aggregator for all aggregation function selected
// in query
executionInfo.getMeasureInfo().setMeasureDataTypes(queryProperties.measureDataTypes);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
index 3b477ab..c69ba6c 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
@@ -58,13 +58,13 @@ public class RestructureUtil {
* @param queryDimensions
* @param tableBlockDimensions
* @param tableComplexDimension
- * @param isUnManagedTable
+ * @param isTransactionalTable
* @return list of query dimension which is present in the table block
*/
public static List<ProjectionDimension> createDimensionInfoAndGetCurrentBlockQueryDimension(
BlockExecutionInfo blockExecutionInfo, List<ProjectionDimension> queryDimensions,
List<CarbonDimension> tableBlockDimensions, List<CarbonDimension> tableComplexDimension,
- int measureCount, boolean isUnManagedTable) {
+ int measureCount, boolean isTransactionalTable) {
List<ProjectionDimension> presentDimension =
new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
boolean[] isDimensionExists = new boolean[queryDimensions.size()];
@@ -84,7 +84,8 @@ public class RestructureUtil {
queryDimension.getDimension().getDataType();
} else {
for (CarbonDimension tableDimension : tableBlockDimensions) {
- if (isColumnMatches(isUnManagedTable, queryDimension.getDimension(), tableDimension)) {
+ if (isColumnMatches(isTransactionalTable, queryDimension.getDimension(),
+ tableDimension)) {
ProjectionDimension currentBlockDimension = new ProjectionDimension(tableDimension);
tableDimension.getColumnSchema()
.setPrecision(queryDimension.getDimension().getColumnSchema().getPrecision());
@@ -106,7 +107,8 @@ public class RestructureUtil {
continue;
}
for (CarbonDimension tableDimension : tableComplexDimension) {
- if (isColumnMatches(isUnManagedTable, queryDimension.getDimension(), tableDimension)) {
+ if (isColumnMatches(isTransactionalTable, queryDimension.getDimension(),
+ tableDimension)) {
ProjectionDimension currentBlockDimension = new ProjectionDimension(tableDimension);
// TODO: for complex dimension set scale and precision by traversing
// the child dimensions
@@ -143,19 +145,19 @@ public class RestructureUtil {
}
/**
- * Match the columns for managed and unmanaged tables
- * @param isUnManagedTable
+ * Match the columns for transactional and non transactional tables
+ * @param isTransactionalTable
* @param queryColumn
* @param tableColumn
* @return
*/
- private static boolean isColumnMatches(boolean isUnManagedTable,
+ private static boolean isColumnMatches(boolean isTransactionalTable,
CarbonColumn queryColumn, CarbonColumn tableColumn) {
- // If it is unmanaged table just check the column names, no need to validate column id as
- // multiple sdk's output placed in a single folder doesn't have same column ID but can
- // have same column name
+ // If it is non transactional table just check the column names, no need to validate
+ // column id as multiple sdk's output placed in a single folder doesn't have same
+ // column ID but can have same column name
return (tableColumn.getColumnId().equals(queryColumn.getColumnId()) ||
- (isUnManagedTable && tableColumn.getColName().equals(queryColumn.getColName())));
+ (!isTransactionalTable && tableColumn.getColName().equals(queryColumn.getColName())));
}
/**
@@ -355,12 +357,12 @@ public class RestructureUtil {
* @param blockExecutionInfo
* @param queryMeasures measures present in query
* @param currentBlockMeasures current block measures
- * @param isUnManagedTable
+ * @param isTransactionalTable
* @return measures present in the block
*/
public static List<ProjectionMeasure> createMeasureInfoAndGetCurrentBlockQueryMeasures(
BlockExecutionInfo blockExecutionInfo, List<ProjectionMeasure> queryMeasures,
- List<CarbonMeasure> currentBlockMeasures, boolean isUnManagedTable) {
+ List<CarbonMeasure> currentBlockMeasures, boolean isTransactionalTable) {
MeasureInfo measureInfo = new MeasureInfo();
List<ProjectionMeasure> presentMeasure = new ArrayList<>(queryMeasures.size());
int numberOfMeasureInQuery = queryMeasures.size();
@@ -373,7 +375,7 @@ public class RestructureUtil {
// then setting measure exists is true
// otherwise adding a default value of a measure
for (CarbonMeasure carbonMeasure : currentBlockMeasures) {
- if (isColumnMatches(isUnManagedTable, carbonMeasure, queryMeasure.getMeasure())) {
+ if (isColumnMatches(isTransactionalTable, carbonMeasure, queryMeasure.getMeasure())) {
ProjectionMeasure currentBlockMeasure = new ProjectionMeasure(carbonMeasure);
carbonMeasure.getColumnSchema().setDataType(queryMeasure.getMeasure().getDataType());
carbonMeasure.getColumnSchema().setPrecision(queryMeasure.getMeasure().getPrecision());
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
index 0e2976a..8c24bd1 100644
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
@@ -240,7 +240,7 @@ public class SegmentUpdateStatusManager {
* @throws Exception
*/
public String[] getDeleteDeltaFilePath(String blockFilePath, String segmentId) throws Exception {
- String blockId = CarbonUtil.getBlockId(identifier, blockFilePath, segmentId, false);
+ String blockId = CarbonUtil.getBlockId(identifier, blockFilePath, segmentId, true);
String tupleId;
if (isPartitionTable) {
tupleId = CarbonTablePath.getShortBlockIdForPartitionTable(blockId);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 91b35f5..27ec202 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -2911,17 +2911,18 @@ public final class CarbonUtil {
* @param identifier
* @param filePath
* @param segmentId
+ * @param isTransactionalTable
* @return
*/
public static String getBlockId(AbsoluteTableIdentifier identifier, String filePath,
- String segmentId, boolean isUnmangedTable) {
+ String segmentId, boolean isTransactionalTable) {
String blockId;
String blockName = filePath.substring(filePath.lastIndexOf("/") + 1, filePath.length());
String tablePath = identifier.getTablePath();
if (filePath.startsWith(tablePath)) {
String factDir = CarbonTablePath.getFactDir(tablePath);
- if (filePath.startsWith(factDir) || isUnmangedTable) {
+ if (filePath.startsWith(factDir) || !isTransactionalTable) {
blockId = "Part0" + CarbonCommonConstants.FILE_SEPARATOR + "Segment_" + segmentId
+ CarbonCommonConstants.FILE_SEPARATOR + blockName;
} else {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
index a72a6bf..403c85d 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
@@ -99,7 +99,8 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
"mapreduce.input.carboninputformat.filter.predicate";
private static final String COLUMN_PROJECTION = "mapreduce.input.carboninputformat.projection";
private static final String TABLE_INFO = "mapreduce.input.carboninputformat.tableinfo";
- private static final String UNMANAGED_TABLE = "mapreduce.input.carboninputformat.unmanaged";
+ private static final String CARBON_TRANSACTIONAL_TABLE =
+ "mapreduce.input.carboninputformat.transactional";
private static final String CARBON_READ_SUPPORT = "mapreduce.input.carboninputformat.readsupport";
private static final String CARBON_CONVERTER = "mapreduce.input.carboninputformat.converter";
private static final String DATA_MAP_DSTR = "mapreduce.input.carboninputformat.datamapdstr";
@@ -161,8 +162,9 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
configuration.set(FileInputFormat.INPUT_DIR, tablePath);
}
- public static void setUnmanagedTable(Configuration configuration, boolean isUnmanagedTable) {
- configuration.set(UNMANAGED_TABLE, String.valueOf(isUnmanagedTable));
+ public static void setTransactionalTable(Configuration configuration,
+ boolean isTransactionalTable) {
+ configuration.set(CARBON_TRANSACTIONAL_TABLE, String.valueOf(isTransactionalTable));
}
public static void setPartitionIdList(Configuration configuration, List<String> partitionIds) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
index 6f65d7d..2851de2 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
@@ -157,10 +157,14 @@ public class CarbonOutputCommitter extends FileOutputCommitter {
}
String uniqueId = null;
if (overwriteSet) {
- if (segmentSize == 0) {
- newMetaEntry.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE);
+ if (!loadModel.isCarbonTransactionalTable()) {
+ CarbonLoaderUtil.deleteNonTransactionalTableForInsertOverwrite(loadModel);
+ } else {
+ if (segmentSize == 0) {
+ newMetaEntry.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE);
+ }
+ uniqueId = overwritePartitions(loadModel, newMetaEntry, uuid);
}
- uniqueId = overwritePartitions(loadModel, newMetaEntry, uuid);
} else {
CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, loadModel, false, false, uuid);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 7b5b8d1..f93be63 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -92,8 +92,8 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
private static final Log LOG = LogFactory.getLog(CarbonTableInputFormat.class);
private static final String CARBON_READ_SUPPORT = "mapreduce.input.carboninputformat.readsupport";
private static final String CARBON_CONVERTER = "mapreduce.input.carboninputformat.converter";
- private static final String CARBON_UNMANAGED_TABLE =
- "mapreduce.input.carboninputformat.unmanaged";
+ private static final String CARBON_TRANSACTIONAL_TABLE =
+ "mapreduce.input.carboninputformat.transactional";
public static final String DATABASE_NAME = "mapreduce.input.carboninputformat.databaseName";
public static final String TABLE_NAME = "mapreduce.input.carboninputformat.tableName";
// a cache for carbon table, it will be used in task side
@@ -627,10 +627,10 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
throws IOException {
if (readCommittedScope == null) {
ReadCommittedScope readCommittedScope;
- if (job.getConfiguration().getBoolean(CARBON_UNMANAGED_TABLE, false)) {
- readCommittedScope = new LatestFilesReadCommittedScope(identifier.getTablePath());
- } else {
+ if (job.getConfiguration().getBoolean(CARBON_TRANSACTIONAL_TABLE, true)) {
readCommittedScope = new TableStatusReadCommittedScope(identifier);
+ } else {
+ readCommittedScope = new LatestFilesReadCommittedScope(identifier.getTablePath());
}
this.readCommittedScope = readCommittedScope;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
index f93f849..43e0221 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
@@ -68,6 +68,8 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Obje
private static final String TEMP_STORE_LOCATIONS = "mapreduce.carbontable.tempstore.locations";
private static final String OVERWRITE_SET = "mapreduce.carbontable.set.overwrite";
public static final String COMPLEX_DELIMITERS = "mapreduce.carbontable.complex_delimiters";
+ private static final String CARBON_TRANSACTIONAL_TABLE =
+ "mapreduce.input.carboninputformat.transactional";
public static final String SERIALIZATION_NULL_FORMAT =
"mapreduce.carbontable.serialization.null.format";
public static final String BAD_RECORDS_LOGGER_ENABLE =
@@ -271,6 +273,7 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Obje
CarbonProperties carbonProperty = CarbonProperties.getInstance();
model.setDatabaseName(CarbonTableOutputFormat.getDatabaseName(conf));
model.setTableName(CarbonTableOutputFormat.getTableName(conf));
+ model.setCarbonTransactionalTable(true);
model.setCarbonDataLoadSchema(new CarbonDataLoadSchema(getCarbonTable(conf)));
model.setTablePath(getTablePath(conf));
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java
index b2c2d39..9075012 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java
@@ -128,6 +128,7 @@ public class StoreCreator {
loadModel.setLoadMetadataDetails(new ArrayList<LoadMetadataDetails>());
loadModel.setTablePath(absoluteTableIdentifier.getTablePath());
loadModel.setDateFormat(null);
+ loadModel.setCarbonTransactionalTable(table.isTransactionalTable());
loadModel.setDefaultTimestampFormat(CarbonProperties.getInstance().getProperty(
CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
CarbonCommonConstants.CARBON_TIMESTAMP_MILLIS));
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
index 8ac2905..36c7414 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
@@ -88,7 +88,8 @@ public class CarbonInputFormatUtil {
if (partitionNames != null) {
CarbonInputFormat.setPartitionsToPrune(conf, partitionNames);
}
- CarbonInputFormat.setUnmanagedTable(conf, carbonTable.getTableInfo().isUnManagedTable());
+ CarbonInputFormat
+ .setTransactionalTable(conf, carbonTable.getTableInfo().isTransactionalTable());
CarbonProjection columnProjection = new CarbonProjection(projectionColumns);
return createInputFormat(conf, carbonTable.getAbsoluteTableIdentifier(),
filterExpression, columnProjection, dataMapJob);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
----------------------------------------------------------------------
diff --git a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
index e12af63..6a8c40d 100644
--- a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
+++ b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
@@ -94,6 +94,7 @@ object CarbonDataStoreCreator {
loadModel.setTableName(
absoluteTableIdentifier.getCarbonTableIdentifier.getTableName)
loadModel.setFactFilePath(dataFilePath)
+ loadModel.setCarbonTransactionalTable(table.isTransactionalTable)
loadModel.setLoadMetadataDetails(new ArrayList[LoadMetadataDetails]())
loadModel.setTablePath(absoluteTableIdentifier.getTablePath)
CarbonProperties.getInstance
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/integration/spark-common-test/src/test/resources/nontransactional.csv
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/resources/nontransactional.csv b/integration/spark-common-test/src/test/resources/nontransactional.csv
new file mode 100644
index 0000000..c22303b
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/nontransactional.csv
@@ -0,0 +1,3 @@
+name, age, height
+arvind, 33, 6.2
+bill, 35, 7.3
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala
index 7841a23..9646c1d 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala
@@ -55,7 +55,7 @@ class TestCarbonFileInputFormatWithExternalCarbonTable extends QueryTest with Be
.toString()
try {
- val builder = CarbonWriter.builder()
+ val builder = CarbonWriter.builder().isTransactionalTable(true)
val writer =
if (persistSchema) {
builder.persistSchemaFile(true)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala
index f421d44..16f19a7 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala
@@ -64,7 +64,7 @@ class TestCreateTableUsingSparkCarbonFileFormat extends QueryTest with BeforeAnd
.toString()
try {
- val builder = CarbonWriter.builder()
+ val builder = CarbonWriter.builder().isTransactionalTable(true)
val writer =
if (persistSchema) {
builder.persistSchemaFile(true)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
new file mode 100644
index 0000000..7798403
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
@@ -0,0 +1,516 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.createTable
+
+import java.io.{File, FileFilter}
+
+import org.apache.commons.io.FileUtils
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.junit.Assert
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.sdk.file.{CarbonWriter, Schema}
+
+
+class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
+
+ var writerPath = new File(this.getClass.getResource("/").getPath
+ +
+ "../." +
+ "./src/test/resources/SparkCarbonFileFormat/WriterOutput/")
+ .getCanonicalPath
+ //getCanonicalPath gives path with \, so code expects /. Need to handle in code ?
+ writerPath = writerPath.replace("\\", "/");
+
+ def buildTestDataSingleFile(): Any = {
+ FileUtils.deleteDirectory(new File(writerPath))
+ buildTestData(3,false)
+ }
+
+ def buildTestDataMultipleFiles(): Any = {
+ FileUtils.deleteDirectory(new File(writerPath))
+ buildTestData(1000000,false)
+ }
+
+ def buildTestDataTwice(): Any = {
+ FileUtils.deleteDirectory(new File(writerPath))
+ buildTestData(3,false)
+ buildTestData(3,false)
+ }
+
+ // prepare sdk writer output
+ def buildTestData(rows:Int, persistSchema:Boolean): Any = {
+ val schema = new StringBuilder()
+ .append("[ \n")
+ .append(" {\"name\":\"string\"},\n")
+ .append(" {\"age\":\"int\"},\n")
+ .append(" {\"height\":\"double\"}\n")
+ .append("]")
+ .toString()
+
+ try {
+ val builder = CarbonWriter.builder()
+ val writer =
+ if (persistSchema) {
+ builder.persistSchemaFile(true)
+ builder.withSchema(Schema.parseJson(schema))
+ .outputPath(writerPath)
+ .isTransactionalTable(false)
+ .uniqueIdentifier(System.currentTimeMillis)
+ .buildWriterForCSVInput()
+ } else {
+ builder.withSchema(Schema.parseJson(schema))
+ .outputPath(writerPath)
+ .isTransactionalTable(false)
+ .uniqueIdentifier(System.currentTimeMillis).withBlockSize(2)
+ .buildWriterForCSVInput()
+ }
+ var i = 0
+ while (i < rows) {
+ writer.write(Array[String]("robot" + i, String.valueOf(i), String.valueOf(i.toDouble / 2)))
+ i += 1
+ }
+ writer.close()
+ } catch {
+ case ex: Exception => None
+ case _ => None
+ }
+ }
+
+ def cleanTestData() = {
+ FileUtils.deleteDirectory(new File(writerPath))
+ }
+
+ def deleteFile(path: String, extension: String): Unit = {
+ val file: CarbonFile = FileFactory
+ .getCarbonFile(path, FileFactory.getFileType(path))
+
+ for (eachDir <- file.listFiles) {
+ if (!eachDir.isDirectory) {
+ if (eachDir.getName.endsWith(extension)) {
+ CarbonUtil.deleteFoldersAndFilesSilent(eachDir)
+ }
+ } else {
+ deleteFile(eachDir.getPath, extension)
+ }
+ }
+ }
+
+ override def beforeAll(): Unit = {
+ sql("DROP TABLE IF EXISTS sdkOutputTable")
+ }
+
+ override def afterAll(): Unit = {
+ sql("DROP TABLE IF EXISTS sdkOutputTable")
+ }
+
+ test("test create External Table with Schema with partition, should ignore schema and partition")
+ {
+ buildTestDataSingleFile()
+ assert(new File(writerPath).exists())
+ sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+ // with partition
+ sql(
+ s"""CREATE EXTERNAL TABLE sdkOutputTable(name string) PARTITIONED BY (age int) STORED BY
+ |'carbondata' LOCATION
+ |'$writerPath' """.stripMargin)
+
+ checkAnswer(sql("select * from sdkOutputTable"), Seq(Row("robot0", 0, 0.0),
+ Row("robot1", 1, 0.5),
+ Row("robot2", 2, 1.0)))
+
+ sql("DROP TABLE sdkOutputTable")
+ // drop table should not delete the files
+ assert(new File(writerPath).exists())
+ cleanTestData()
+ }
+
+
+ test("test create External Table with insert into feature")
+ {
+ buildTestData(3, false)
+ assert(new File(writerPath).exists())
+ sql("DROP TABLE IF EXISTS sdkOutputTable")
+ sql("DROP TABLE IF EXISTS t1")
+
+ // with partition
+ sql(
+ s"""CREATE EXTERNAL TABLE sdkOutputTable(name string) PARTITIONED BY (age int) STORED BY
+ |'carbondata' LOCATION
+ |'$writerPath' """.stripMargin)
+
+ checkAnswer(sql("select * from sdkOutputTable"), Seq(Row("robot0", 0, 0.0),
+ Row("robot1", 1, 0.5),
+ Row("robot2", 2, 1.0)))
+
+ sql("create table if not exists t1 (name string, age int, height double) STORED BY 'org.apache.carbondata.format'")
+ sql (s"""insert into t1 values ("aaaaa", 12, 20)""").show(200,false)
+ sql("select * from t1").show(200,false)
+ sql("insert into sdkOutputTable select * from t1").show(200,false)
+
+ checkAnswer(sql(s"""select * from sdkOutputTable where age = 12"""),
+ Seq(Row("aaaaa", 12, 20.0)))
+
+ sql("DROP TABLE sdkOutputTable")
+ sql("drop table t1")
+
+ // drop table should not delete the files
+ assert(new File(writerPath).exists())
+ cleanTestData()
+ }
+
+ test("test create External Table with insert overwrite")
+ {
+ buildTestData(3, false)
+ assert(new File(writerPath).exists())
+ sql("DROP TABLE IF EXISTS sdkOutputTable")
+ sql("DROP TABLE IF EXISTS t1")
+ sql("DROP TABLE IF EXISTS t2")
+ sql("DROP TABLE IF EXISTS t3")
+
+ // with partition
+ sql(
+ s"""CREATE EXTERNAL TABLE sdkOutputTable(name string) PARTITIONED BY (age int) STORED BY
+ |'carbondata' LOCATION
+ |'$writerPath' """.stripMargin)
+
+ checkAnswer(sql("select * from sdkOutputTable"), Seq(Row("robot0", 0, 0.0),
+ Row("robot1", 1, 0.5),
+ Row("robot2", 2, 1.0)))
+
+ sql("create table if not exists t1 (name string, age int, height double) STORED BY 'org.apache.carbondata.format'")
+ sql (s"""insert into t1 values ("aaaaa", 12, 20)""").show(200,false)
+
+ checkAnswer(sql(s"""select count(*) from sdkOutputTable where age = 1"""),
+ Seq(Row(1)))
+
+ sql("insert overwrite table sdkOutputTable select * from t1").show(200,false)
+
+ checkAnswer(sql(s"""select count(*) from sdkOutputTable where age = 1"""),
+ Seq(Row(0)))
+
+ sql("DROP TABLE if exists sdkOutputTable")
+ sql("drop table if exists t1")
+
+ // drop table should not delete the files
+ assert(new File(writerPath).exists())
+ cleanTestData()
+ }
+
+
+ test("test create External Table with Load")
+ {
+ buildTestData(3, false)
+ assert(new File(writerPath).exists())
+ sql("DROP TABLE IF EXISTS sdkOutputTable")
+ sql("DROP TABLE IF EXISTS t1")
+ sql("DROP TABLE IF EXISTS t2")
+ sql("DROP TABLE IF EXISTS t3")
+
+ // with partition
+ sql(
+ s"""CREATE EXTERNAL TABLE sdkOutputTable(name string) PARTITIONED BY (age int) STORED BY
+ |'carbondata' LOCATION
+ |'$writerPath' """.stripMargin)
+
+ checkAnswer(sql("select * from sdkOutputTable"), Seq(Row("robot0", 0, 0.0),
+ Row("robot1", 1, 0.5),
+ Row("robot2", 2, 1.0)))
+
+ sql("create table if not exists t1 (name string, age int, height double) STORED BY 'org.apache.carbondata.format'")
+ sql (s"""insert into t1 values ("aaaaa", 12, 20)""").show(200,false)
+
+ checkAnswer(sql(s"""select count(*) from sdkOutputTable where age = 1"""),
+ Seq(Row(1)))
+
+ // scalastyle:off
+ sql(
+ s"""
+ | LOAD DATA LOCAL INPATH '$resourcesPath/nontransactional.csv'
+ | INTO TABLE sdkOutputTable
+ | OPTIONS('HEADER'='true')
+ """.stripMargin)
+
+ checkAnswer(sql(s"""select count(*) from sdkOutputTable where height = 6.2"""),
+ Seq(Row(1)))
+
+ sql("DROP TABLE if exists sdkOutputTable")
+ sql("drop table if exists t1")
+
+ // drop table should not delete the files
+ assert(new File(writerPath).exists())
+ cleanTestData()
+ }
+
+
+
+ test("read non transactional table, files written from sdk Writer Output)") {
+ buildTestDataSingleFile()
+ assert(new File(writerPath).exists())
+ sql("DROP TABLE IF EXISTS sdkOutputTable1")
+
+ sql(
+ s"""CREATE EXTERNAL TABLE sdkOutputTable1 STORED BY 'carbondata' LOCATION
+ |'$writerPath' """.stripMargin)
+
+ checkAnswer(sql("select * from sdkOutputTable1"), Seq(Row("robot0", 0, 0.0),
+ Row("robot1", 1, 0.5),
+ Row("robot2", 2, 1.0)))
+
+ checkAnswer(sql("select name from sdkOutputTable1"), Seq(Row("robot0"),
+ Row("robot1"),
+ Row("robot2")))
+
+ checkAnswer(sql("select age from sdkOutputTable1"), Seq(Row(0), Row(1), Row(2)))
+
+ checkAnswer(sql("select * from sdkOutputTable1 where age > 1 and age < 8"),
+ Seq(Row("robot2", 2, 1.0)))
+
+ checkAnswer(sql("select * from sdkOutputTable1 where name = 'robot2'"),
+ Seq(Row("robot2", 2, 1.0)))
+
+ checkAnswer(sql("select * from sdkOutputTable1 where name like '%obot%' limit 2"),
+ Seq(Row("robot0", 0, 0.0),
+ Row("robot1", 1, 0.5)))
+
+ checkAnswer(sql("select sum(age) from sdkOutputTable1 where name like 'robot%'"), Seq(Row(3)))
+
+ checkAnswer(sql("select count(*) from sdkOutputTable1 where name like 'robot%' "), Seq(Row(3)))
+
+ checkAnswer(sql("select count(*) from sdkOutputTable1"), Seq(Row(3)))
+
+ sql("DROP TABLE sdkOutputTable1")
+ // drop table should not delete the files
+ assert(new File(writerPath).exists())
+ cleanTestData()
+ }
+
+ test("Test Blocked operations for non transactional table ") {
+ buildTestDataSingleFile()
+ assert(new File(writerPath).exists())
+ sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+ sql(
+ s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
+ |'$writerPath' """.stripMargin)
+
+ //1. alter datatype
+ var exception = intercept[MalformedCarbonCommandException] {
+ sql("Alter table sdkOutputTable change age age BIGINT")
+ }
+ assert(exception.getMessage()
+ .contains("Unsupported operation on non transactional table"))
+
+ //2. Datamap creation
+ exception = intercept[MalformedCarbonCommandException] {
+ sql(
+ "CREATE DATAMAP agg_sdkOutputTable ON TABLE sdkOutputTable USING \"preaggregate\" AS " +
+ "SELECT name, sum(age) FROM sdkOutputTable GROUP BY name,age")
+ }
+ assert(exception.getMessage()
+ .contains("Unsupported operation on non transactional table"))
+
+ //3. compaction
+ exception = intercept[MalformedCarbonCommandException] {
+ sql("ALTER TABLE sdkOutputTable COMPACT 'MAJOR'")
+ }
+ assert(exception.getMessage()
+ .contains("Unsupported operation on non transactional table"))
+
+ //4. Show segments
+ exception = intercept[MalformedCarbonCommandException] {
+ sql("Show segments for table sdkOutputTable").show(false)
+ }
+ assert(exception.getMessage()
+ .contains("Unsupported operation on non transactional table"))
+
+ //5. Delete segment by ID
+ exception = intercept[MalformedCarbonCommandException] {
+ sql("DELETE FROM TABLE sdkOutputTable WHERE SEGMENT.ID IN (0)")
+ }
+ assert(exception.getMessage()
+ .contains("Unsupported operation on non transactional table"))
+
+ //6. Delete segment by date
+ exception = intercept[MalformedCarbonCommandException] {
+ sql("DELETE FROM TABLE sdkOutputTable WHERE SEGMENT.STARTTIME BEFORE '2017-06-01 12:05:06'")
+ }
+ assert(exception.getMessage()
+ .contains("Unsupported operation on non transactional table"))
+
+ //7. Update Segment
+ exception = intercept[MalformedCarbonCommandException] {
+ sql("UPDATE sdkOutputTable SET (age) = (age + 9) ").show(false)
+ }
+ assert(exception.getMessage()
+ .contains("Unsupported operation on non transactional table"))
+
+ //8. Delete Segment
+ exception = intercept[MalformedCarbonCommandException] {
+ sql("DELETE FROM sdkOutputTable where name='robot1'").show(false)
+ }
+ assert(exception.getMessage()
+ .contains("Unsupported operation on non transactional table"))
+
+ //9. Show partition
+ exception = intercept[MalformedCarbonCommandException] {
+ sql("Show partitions sdkOutputTable").show(false)
+ }
+ assert(exception.getMessage()
+ .contains("Unsupported operation on non transactional table"))
+
+ //12. Streaming table creation
+ // No need as External table don't accept table properties
+
+ //13. Alter table rename command
+ exception = intercept[MalformedCarbonCommandException] {
+ sql("ALTER TABLE sdkOutputTable RENAME to newTable")
+ }
+ assert(exception.getMessage()
+ .contains("Unsupported operation on non transactional table"))
+
+ sql("DROP TABLE sdkOutputTable")
+ //drop table should not delete the files
+ assert(new File(writerPath).exists())
+ cleanTestData()
+ }
+
+ test("test create External Table With Schema, should ignore the schema provided") {
+ buildTestDataSingleFile()
+ assert(new File(writerPath).exists())
+ sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+ // with schema
+ sql(
+ s"""CREATE EXTERNAL TABLE sdkOutputTable(age int) STORED BY
+ |'carbondata' LOCATION
+ |'$writerPath' """.stripMargin)
+
+ checkAnswer(sql("select * from sdkOutputTable"), Seq(Row("robot0", 0, 0.0),
+ Row("robot1", 1, 0.5),
+ Row("robot2", 2, 1.0)))
+
+ sql("DROP TABLE sdkOutputTable")
+ // drop table should not delete the files
+ assert(new File(writerPath).exists())
+ cleanTestData()
+ }
+
+ test("Read sdk writer output file without Carbondata file should fail") {
+ buildTestDataSingleFile()
+ deleteFile(writerPath, CarbonCommonConstants.FACT_FILE_EXT)
+ assert(new File(writerPath).exists())
+ sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+ val exception = intercept[Exception] {
+ // data source file format
+ sql(
+ s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
+ |'$writerPath' """.stripMargin)
+ }
+ assert(exception.getMessage()
+ .contains("Operation not allowed: Invalid table path provided:"))
+
+ // drop table should not delete the files
+ assert(new File(writerPath).exists())
+ cleanTestData()
+ }
+
+
+ test("Read sdk writer output file without any file should fail") {
+ buildTestDataSingleFile()
+ deleteFile(writerPath, CarbonCommonConstants.FACT_FILE_EXT)
+ deleteFile(writerPath, CarbonCommonConstants.UPDATE_INDEX_FILE_EXT)
+ assert(new File(writerPath).exists())
+ sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+ val exception = intercept[Exception] {
+ //data source file format
+ sql(
+ s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
+ |'$writerPath' """.stripMargin)
+
+ sql("select * from sdkOutputTable").show(false)
+ }
+ assert(exception.getMessage()
+ .contains("Operation not allowed: Invalid table path provided:"))
+
+ // drop table should not delete the files
+ assert(new File(writerPath).exists())
+ cleanTestData()
+ }
+
+ test("Read sdk writer output multiple files ") {
+ buildTestDataMultipleFiles()
+ assert(new File(writerPath).exists())
+ val folder = new File(writerPath)
+ val dataFiles = folder.listFiles(new FileFilter() {
+ override def accept(pathname: File): Boolean = {
+ pathname.getName
+ .endsWith(CarbonCommonConstants.FACT_FILE_EXT)
+ }
+ })
+ Assert.assertNotNull(dataFiles)
+ Assert.assertNotEquals(1, dataFiles.length)
+
+ sql("DROP TABLE IF EXISTS sdkOutputTable")
+ sql("DROP TABLE IF EXISTS t1")
+
+ sql(
+ s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
+ |'$writerPath' """.stripMargin)
+
+ checkAnswer(sql("select count(*) from sdkOutputTable"), Seq(Row(1000000)))
+
+ // drop table should not delete the files
+ assert(new File(writerPath).exists())
+ cleanTestData()
+ }
+
+ test("Read two sdk writer outputs with same column name placed in same folder") {
+ buildTestDataTwice()
+ assert(new File(writerPath).exists())
+
+ sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+ sql(
+ s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
+ |'$writerPath' """.stripMargin)
+
+
+ checkAnswer(sql("select * from sdkOutputTable"), Seq(Row("robot0", 0, 0.0),
+ Row("robot1", 1, 0.5),
+ Row("robot2", 2, 1.0),
+ Row("robot0", 0, 0.0),
+ Row("robot1", 1, 0.5),
+ Row("robot2", 2, 1.0)))
+
+ // drop table should not delete the files
+ assert(new File(writerPath).exists())
+ cleanTestData()
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala
index de91f2a..53dadf6 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala
@@ -54,7 +54,7 @@ object TestSparkCarbonFileFormatWithSparkSession {
.toString()
try {
- val builder = CarbonWriter.builder()
+ val builder = CarbonWriter.builder().isTransactionalTable(true)
val writer =
if (persistSchema) {
builder.persistSchemaFile(true)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestUnmanagedCarbonTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestUnmanagedCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestUnmanagedCarbonTable.scala
deleted file mode 100644
index a6ee807..0000000
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestUnmanagedCarbonTable.scala
+++ /dev/null
@@ -1,408 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.testsuite.createTable
-
-import java.io.{File, FileFilter}
-
-import org.apache.commons.io.FileUtils
-import org.apache.spark.sql.Row
-import org.apache.spark.sql.test.util.QueryTest
-import org.junit.Assert
-import org.scalatest.BeforeAndAfterAll
-
-import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datastore.filesystem.CarbonFile
-import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.util.CarbonUtil
-import org.apache.carbondata.sdk.file.{CarbonWriter, Schema}
-
-
-class TestUnmanagedCarbonTable extends QueryTest with BeforeAndAfterAll {
-
- var writerPath = new File(this.getClass.getResource("/").getPath
- +
- "../." +
- "./src/test/resources/SparkCarbonFileFormat/WriterOutput/")
- .getCanonicalPath
- //getCanonicalPath gives path with \, so code expects /. Need to handle in code ?
- writerPath = writerPath.replace("\\", "/");
-
- def buildTestDataSingleFile(): Any = {
- FileUtils.deleteDirectory(new File(writerPath))
- buildTestData(3,false)
- }
-
- def buildTestDataMultipleFiles(): Any = {
- FileUtils.deleteDirectory(new File(writerPath))
- buildTestData(1000000,false)
- }
-
- def buildTestDataTwice(): Any = {
- FileUtils.deleteDirectory(new File(writerPath))
- buildTestData(3,false)
- buildTestData(3,false)
- }
-
- // prepare sdk writer output
- def buildTestData(rows:Int, persistSchema:Boolean): Any = {
- val schema = new StringBuilder()
- .append("[ \n")
- .append(" {\"name\":\"string\"},\n")
- .append(" {\"age\":\"int\"},\n")
- .append(" {\"height\":\"double\"}\n")
- .append("]")
- .toString()
-
- try {
- val builder = CarbonWriter.builder()
- val writer =
- if (persistSchema) {
- builder.persistSchemaFile(true)
- builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath).unManagedTable(true)
- .uniqueIdentifier(
- System.currentTimeMillis)
- .buildWriterForCSVInput()
- } else {
- builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath).unManagedTable(true)
- .uniqueIdentifier(
- System.currentTimeMillis).withBlockSize(2)
- .buildWriterForCSVInput()
- }
- var i = 0
- while (i < rows) {
- writer.write(Array[String]("robot" + i, String.valueOf(i), String.valueOf(i.toDouble / 2)))
- i += 1
- }
- writer.close()
- } catch {
- case ex: Exception => None
- case _ => None
- }
- }
-
- def cleanTestData() = {
- FileUtils.deleteDirectory(new File(writerPath))
- }
-
- def deleteFile(path: String, extension: String): Unit = {
- val file: CarbonFile = FileFactory
- .getCarbonFile(path, FileFactory.getFileType(path))
-
- for (eachDir <- file.listFiles) {
- if (!eachDir.isDirectory) {
- if (eachDir.getName.endsWith(extension)) {
- CarbonUtil.deleteFoldersAndFilesSilent(eachDir)
- }
- } else {
- deleteFile(eachDir.getPath, extension)
- }
- }
- }
-
- override def beforeAll(): Unit = {
- sql("DROP TABLE IF EXISTS sdkOutputTable")
- }
-
- override def afterAll(): Unit = {
- sql("DROP TABLE IF EXISTS sdkOutputTable")
- }
-
- test("test create External Table with Schema with partition, should ignore schema and partition")
- {
- buildTestDataSingleFile()
- assert(new File(writerPath).exists())
- sql("DROP TABLE IF EXISTS sdkOutputTable")
-
- // with partition
- sql(
- s"""CREATE EXTERNAL TABLE sdkOutputTable(name string) PARTITIONED BY (age int) STORED BY
- |'carbondata' LOCATION
- |'$writerPath' """.stripMargin)
-
- checkAnswer(sql("select * from sdkOutputTable"), Seq(Row("robot0", 0, 0.0),
- Row("robot1", 1, 0.5),
- Row("robot2", 2, 1.0)))
-
- sql("DROP TABLE sdkOutputTable")
- // drop table should not delete the files
- assert(new File(writerPath).exists())
- cleanTestData()
- }
-
- test("read unmanaged table, files written from sdk Writer Output)") {
- buildTestDataSingleFile()
- assert(new File(writerPath).exists())
- sql("DROP TABLE IF EXISTS sdkOutputTable1")
-
- sql(
- s"""CREATE EXTERNAL TABLE sdkOutputTable1 STORED BY 'carbondata' LOCATION
- |'$writerPath' """.stripMargin)
-
- checkAnswer(sql("select * from sdkOutputTable1"), Seq(Row("robot0", 0, 0.0),
- Row("robot1", 1, 0.5),
- Row("robot2", 2, 1.0)))
-
- checkAnswer(sql("select name from sdkOutputTable1"), Seq(Row("robot0"),
- Row("robot1"),
- Row("robot2")))
-
- checkAnswer(sql("select age from sdkOutputTable1"), Seq(Row(0), Row(1), Row(2)))
-
- checkAnswer(sql("select * from sdkOutputTable1 where age > 1 and age < 8"),
- Seq(Row("robot2", 2, 1.0)))
-
- checkAnswer(sql("select * from sdkOutputTable1 where name = 'robot2'"),
- Seq(Row("robot2", 2, 1.0)))
-
- checkAnswer(sql("select * from sdkOutputTable1 where name like '%obot%' limit 2"),
- Seq(Row("robot0", 0, 0.0),
- Row("robot1", 1, 0.5)))
-
- checkAnswer(sql("select sum(age) from sdkOutputTable1 where name like 'robot%'"), Seq(Row(3)))
-
- checkAnswer(sql("select count(*) from sdkOutputTable1 where name like 'robot%' "), Seq(Row(3)))
-
- checkAnswer(sql("select count(*) from sdkOutputTable1"), Seq(Row(3)))
-
- sql("DROP TABLE sdkOutputTable1")
- // drop table should not delete the files
- assert(new File(writerPath).exists())
- cleanTestData()
- }
-
- test("Test Blocked operations for unmanaged table ") {
- buildTestDataSingleFile()
- assert(new File(writerPath).exists())
- sql("DROP TABLE IF EXISTS sdkOutputTable")
-
- sql(
- s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
- |'$writerPath' """.stripMargin)
-
- //1. alter datatype
- var exception = intercept[MalformedCarbonCommandException] {
- sql("Alter table sdkOutputTable change age age BIGINT")
- }
- assert(exception.getMessage()
- .contains("Unsupported operation on unmanaged table"))
-
- //2. Load
- exception = intercept[MalformedCarbonCommandException] {
- sql("LOAD DATA LOCAL INPATH '/path/to/data' INTO TABLE sdkOutputTable ")
- }
- assert(exception.getMessage()
- .contains("Unsupported operation on unmanaged table"))
-
- //3. Datamap creation
- exception = intercept[MalformedCarbonCommandException] {
- sql(
- "CREATE DATAMAP agg_sdkOutputTable ON TABLE sdkOutputTable USING \"preaggregate\" AS " +
- "SELECT name, sum(age) FROM sdkOutputTable GROUP BY name,age")
- }
- assert(exception.getMessage()
- .contains("Unsupported operation on unmanaged table"))
-
- //4. Insert Into
- exception = intercept[MalformedCarbonCommandException] {
- sql("insert into table sdkOutputTable SELECT 20,'robotX',2.5")
- }
- assert(exception.getMessage()
- .contains("Unsupported operation on unmanaged table"))
-
- //5. compaction
- exception = intercept[MalformedCarbonCommandException] {
- sql("ALTER TABLE sdkOutputTable COMPACT 'MAJOR'")
- }
- assert(exception.getMessage()
- .contains("Unsupported operation on unmanaged table"))
-
- //6. Show segments
- exception = intercept[MalformedCarbonCommandException] {
- sql("Show segments for table sdkOutputTable").show(false)
- }
- assert(exception.getMessage()
- .contains("Unsupported operation on unmanaged table"))
-
- //7. Delete segment by ID
- exception = intercept[MalformedCarbonCommandException] {
- sql("DELETE FROM TABLE sdkOutputTable WHERE SEGMENT.ID IN (0)")
- }
- assert(exception.getMessage()
- .contains("Unsupported operation on unmanaged table"))
-
- //8. Delete segment by date
- exception = intercept[MalformedCarbonCommandException] {
- sql("DELETE FROM TABLE sdkOutputTable WHERE SEGMENT.STARTTIME BEFORE '2017-06-01 12:05:06'")
- }
- assert(exception.getMessage()
- .contains("Unsupported operation on unmanaged table"))
-
- //9. Update column
- exception = intercept[MalformedCarbonCommandException] {
- sql("UPDATE sdkOutputTable SET (age) = (age + 9) ").show(false)
- }
- assert(exception.getMessage()
- .contains("Unsupported operation on unmanaged table"))
-
- //10. Delete column
- exception = intercept[MalformedCarbonCommandException] {
- sql("DELETE FROM sdkOutputTable where name='robot1'").show(false)
- }
- assert(exception.getMessage()
- .contains("Unsupported operation on unmanaged table"))
-
- //11. Show partition
- exception = intercept[MalformedCarbonCommandException] {
- sql("Show partitions sdkOutputTable").show(false)
- }
- assert(exception.getMessage()
- .contains("Unsupported operation on unmanaged table"))
-
- //12. Streaming table creation
- // No need as External table don't accept table properties
-
- //13. Alter table rename command
- exception = intercept[MalformedCarbonCommandException] {
- sql("ALTER TABLE sdkOutputTable RENAME to newTable")
- }
- assert(exception.getMessage()
- .contains("Unsupported operation on unmanaged table"))
-
- sql("DROP TABLE sdkOutputTable")
- //drop table should not delete the files
- assert(new File(writerPath).exists())
- cleanTestData()
- }
-
- test("test create External Table With Schema, should ignore the schema provided") {
- buildTestDataSingleFile()
- assert(new File(writerPath).exists())
- sql("DROP TABLE IF EXISTS sdkOutputTable")
-
- // with schema
- sql(
- s"""CREATE EXTERNAL TABLE sdkOutputTable(age int) STORED BY
- |'carbondata' LOCATION
- |'$writerPath' """.stripMargin)
-
- checkAnswer(sql("select * from sdkOutputTable"), Seq(Row("robot0", 0, 0.0),
- Row("robot1", 1, 0.5),
- Row("robot2", 2, 1.0)))
-
- sql("DROP TABLE sdkOutputTable")
- // drop table should not delete the files
- assert(new File(writerPath).exists())
- cleanTestData()
- }
-
- test("Read sdk writer output file without Carbondata file should fail") {
- buildTestDataSingleFile()
- deleteFile(writerPath, CarbonCommonConstants.FACT_FILE_EXT)
- assert(new File(writerPath).exists())
- sql("DROP TABLE IF EXISTS sdkOutputTable")
-
- val exception = intercept[Exception] {
- // data source file format
- sql(
- s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
- |'$writerPath' """.stripMargin)
- }
- assert(exception.getMessage()
- .contains("Operation not allowed: Invalid table path provided:"))
-
- // drop table should not delete the files
- assert(new File(writerPath).exists())
- cleanTestData()
- }
-
-
- test("Read sdk writer output file without any file should fail") {
- buildTestDataSingleFile()
- deleteFile(writerPath, CarbonCommonConstants.FACT_FILE_EXT)
- deleteFile(writerPath, CarbonCommonConstants.UPDATE_INDEX_FILE_EXT)
- assert(new File(writerPath).exists())
- sql("DROP TABLE IF EXISTS sdkOutputTable")
-
- val exception = intercept[Exception] {
- //data source file format
- sql(
- s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
- |'$writerPath' """.stripMargin)
-
- sql("select * from sdkOutputTable").show(false)
- }
- assert(exception.getMessage()
- .contains("Operation not allowed: Invalid table path provided:"))
-
- // drop table should not delete the files
- assert(new File(writerPath).exists())
- cleanTestData()
- }
-
- test("Read sdk writer output multiple files ") {
- buildTestDataMultipleFiles()
- assert(new File(writerPath).exists())
- val folder = new File(writerPath)
- val dataFiles = folder.listFiles(new FileFilter() {
- override def accept(pathname: File): Boolean = {
- pathname.getName
- .endsWith(CarbonCommonConstants.FACT_FILE_EXT)
- }
- })
- Assert.assertNotNull(dataFiles)
- Assert.assertNotEquals(1, dataFiles.length)
-
- sql("DROP TABLE IF EXISTS sdkOutputTable")
-
- sql(
- s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
- |'$writerPath' """.stripMargin)
-
- checkAnswer(sql("select count(*) from sdkOutputTable"), Seq(Row(1000000)))
-
- // drop table should not delete the files
- assert(new File(writerPath).exists())
- cleanTestData()
- }
-
- test("Read two sdk writer outputs with same column name placed in same folder") {
- buildTestDataTwice()
- assert(new File(writerPath).exists())
-
- sql("DROP TABLE IF EXISTS sdkOutputTable")
-
- sql(
- s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
- |'$writerPath' """.stripMargin)
-
-
- checkAnswer(sql("select * from sdkOutputTable"), Seq(Row("robot0", 0, 0.0),
- Row("robot1", 1, 0.5),
- Row("robot2", 2, 1.0),
- Row("robot0", 0, 0.0),
- Row("robot1", 1, 0.5),
- Row("robot2", 2, 1.0)))
-
- // drop table should not delete the files
- assert(new File(writerPath).exists())
- cleanTestData()
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 6d67daf..954eefc 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -519,7 +519,7 @@ class CarbonScanRDD[T: ClassTag](
CarbonInputFormat.setPartitionsToPrune(conf, partitionNames.asJava)
}
- CarbonInputFormat.setUnmanagedTable(conf, tableInfo.isUnManagedTable)
+ CarbonInputFormat.setTransactionalTable(conf, tableInfo.isTransactionalTable)
createInputFormat(conf)
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index e978614..db22d6b 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -275,6 +275,7 @@ object CarbonDataRDDFactory {
loadModel.setTableName(table.getCarbonTableIdentifier.getTableName)
loadModel.setDatabaseName(table.getCarbonTableIdentifier.getDatabaseName)
loadModel.setTablePath(table.getTablePath)
+ loadModel.setCarbonTransactionalTable(table.isTransactionalTable)
loadModel.readAndSetLoadMetadataDetails()
val loadStartTime = CarbonUpdateUtil.readCurrentTime()
loadModel.setFactTimeStamp(loadStartTime)
@@ -300,7 +301,7 @@ object CarbonDataRDDFactory {
var res: Array[List[(String, (LoadMetadataDetails, ExecutionErrors))]] = null
// create new segment folder in carbon store
- if (updateModel.isEmpty) {
+ if (updateModel.isEmpty && carbonLoadModel.isCarbonTransactionalTable) {
CarbonLoaderUtil.checkAndCreateCarbonDataLocation(carbonLoadModel.getSegmentId, carbonTable)
}
var loadStatus = SegmentStatus.SUCCESS
@@ -313,7 +314,7 @@ object CarbonDataRDDFactory {
CarbonTablePath.addSegmentPrefix(carbonLoadModel.getSegmentId) + LockUsage.LOCK)
try {
- if (segmentLock.lockWithRetries()) {
+ if (!carbonLoadModel.isCarbonTransactionalTable || segmentLock.lockWithRetries()) {
if (updateModel.isDefined) {
res = loadDataFrameForUpdate(
sqlContext,
@@ -493,13 +494,14 @@ object CarbonDataRDDFactory {
}
// as no record loaded in new segment, new segment should be deleted
val newEntryLoadStatus =
- if (!carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isChildDataMap &&
- !CarbonLoaderUtil.isValidSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)) {
- LOGGER.warn("Cannot write load metadata file as there is no data to load")
- SegmentStatus.MARKED_FOR_DELETE
- } else {
- loadStatus
- }
+ if (carbonLoadModel.isCarbonTransactionalTable &&
+ !carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isChildDataMap &&
+ !CarbonLoaderUtil.isValidSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)) {
+ LOGGER.warn("Cannot write load metadata file as there is no data to load")
+ SegmentStatus.MARKED_FOR_DELETE
+ } else {
+ loadStatus
+ }
writeDictionary(carbonLoadModel, result, writeAll = false)
@@ -813,6 +815,10 @@ object CarbonDataRDDFactory {
true)
CarbonLoaderUtil
.addDataIndexSizeIntoMetaEntry(metadataDetails, carbonLoadModel.getSegmentId, carbonTable)
+
+ if (!carbonLoadModel.isCarbonTransactionalTable && overwriteTable) {
+ CarbonLoaderUtil.deleteNonTransactionalTableForInsertOverwrite(carbonLoadModel)
+ }
val done = CarbonLoaderUtil.recordNewLoadMetadata(metadataDetails, carbonLoadModel, false,
overwriteTable, uuid)
if (!done) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
index e29986a..488a53d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
@@ -77,7 +77,8 @@ case class CarbonCountStar(
val job = new Job(jobConf)
FileInputFormat.addInputPath(job, new Path(absoluteTableIdentifier.getTablePath))
CarbonInputFormat
- .setUnmanagedTable(job.getConfiguration, carbonTable.getTableInfo.isUnManagedTable)
+ .setTransactionalTable(job.getConfiguration,
+ carbonTable.getTableInfo.isTransactionalTable)
(job, carbonInputFormat)
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index c8d9fe4..3600854 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -313,9 +313,10 @@ object CarbonSource {
} else {
val tableInfo = CarbonUtil.convertGsonToTableInfo(properties.asJava)
val isExternal = properties.getOrElse("isExternal", "false")
- val isUnManagedTable = properties.getOrElse("isUnManaged", "false").contains("true")
- tableInfo.setUnManagedTable(isUnManagedTable)
- if (!isUnManagedTable && !metaStore.isReadFromHiveMetaStore) {
+ val isTransactionalTable = properties.getOrElse("isTransactional", "true")
+ .contains("true")
+ tableInfo.setTransactionalTable(isTransactionalTable)
+ if (isTransactionalTable && !metaStore.isReadFromHiveMetaStore) {
// save to disk
metaStore.saveToDisk(tableInfo, properties("tablePath"))
// remove schema string from map as we don't store carbon schema to hive metastore
@@ -337,15 +338,16 @@ object CarbonSource {
val model = createTableInfoFromParams(properties, dataSchema, identifier)
val tableInfo: TableInfo = TableNewProcessor(model)
val isExternal = properties.getOrElse("isExternal", "false")
- val isUnManagedTable = properties.getOrElse("isUnManaged", "false").contains("true")
+ val isTransactionalTable = properties.getOrElse("isTransactional", "true")
+ .contains("true")
val tablePath = properties.getOrElse("path", "")
tableInfo.setTablePath(identifier.getTablePath)
- tableInfo.setUnManagedTable(isUnManagedTable)
+ tableInfo.setTransactionalTable(isTransactionalTable)
tableInfo.setDatabaseName(identifier.getDatabaseName)
val schemaEvolutionEntry = new SchemaEvolutionEntry
schemaEvolutionEntry.setTimeStamp(tableInfo.getLastUpdatedTime)
tableInfo.getFactTable.getSchemaEvalution.getSchemaEvolutionEntryList.add(schemaEvolutionEntry)
- val map = if (!metaStore.isReadFromHiveMetaStore && !isUnManagedTable) {
+ val map = if (!metaStore.isReadFromHiveMetaStore && isTransactionalTable) {
metaStore.saveToDisk(tableInfo, identifier.getTablePath)
new java.util.HashMap[String, String]()
} else {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
index 60366c4..03f165c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
@@ -56,8 +56,8 @@ case class CarbonCreateDataMapCommand(
case _ => null
}
- if (mainTable != null && mainTable.getTableInfo.isUnManagedTable) {
- throw new MalformedCarbonCommandException("Unsupported operation on unmanaged table")
+ if (mainTable != null && !mainTable.getTableInfo.isTransactionalTable) {
+ throw new MalformedCarbonCommandException("Unsupported operation on non transactional table")
}
if (mainTable != null && mainTable.getDataMapSchema(dataMapName) != null) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
index c462c9e..e12f052 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
@@ -77,8 +77,8 @@ case class CarbonAlterTableCompactionCommand(
}
relation.carbonTable
}
- if (table.getTableInfo.isUnManagedTable) {
- throw new MalformedCarbonCommandException("Unsupported operation on unmanaged table")
+ if (!table.getTableInfo.isTransactionalTable) {
+ throw new MalformedCarbonCommandException("Unsupported operation on non transactional table")
}
if (CarbonUtil.hasAggregationDataMap(table) ||
@@ -126,6 +126,7 @@ case class CarbonAlterTableCompactionCommand(
// Need to fill dimension relation
carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
carbonLoadModel.setTableName(table.getTableName)
+ carbonLoadModel.setCarbonTransactionalTable(table.isTransactionalTable)
carbonLoadModel.setDatabaseName(table.getDatabaseName)
carbonLoadModel.setTablePath(table.getTablePath)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala
index 57ccd82..165a032 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala
@@ -36,8 +36,8 @@ case class CarbonDeleteLoadByIdCommand(
Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
- if (carbonTable.getTableInfo.isUnManagedTable) {
- throw new MalformedCarbonCommandException("Unsupported operation on unmanaged table")
+ if (!carbonTable.getTableInfo.isTransactionalTable) {
+ throw new MalformedCarbonCommandException("Unsupported operation on non transactional table")
}
// if insert overwrite in progress, do not allow delete segment
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala
index 7d0655d..19f5100 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala
@@ -37,8 +37,8 @@ case class CarbonDeleteLoadByLoadDateCommand(
Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
- if (carbonTable.getTableInfo.isUnManagedTable) {
- throw new MalformedCarbonCommandException("Unsupported operation on unmanaged table")
+ if (!carbonTable.getTableInfo.isTransactionalTable) {
+ throw new MalformedCarbonCommandException("Unsupported operation on non transactional table")
}
// if insert overwrite in progress, do not allow delete segment