You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/07/25 17:37:41 UTC
[04/15] carbondata git commit: [CARBONDATA-1286] Change Query related
RDD to use TableInfo
[CARBONDATA-1286] Change Query related RDD to use TableInfo
This closes #1146
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/ac5aee18
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/ac5aee18
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/ac5aee18
Branch: refs/heads/master
Commit: ac5aee187f4f1c3332226435ac9d1aa7c396ae29
Parents: 0158968
Author: jackylk <ja...@huawei.com>
Authored: Sat Jul 8 17:34:44 2017 +0800
Committer: Raghunandan S <ca...@gmail.com>
Committed: Tue Jul 25 17:35:09 2017 +0800
----------------------------------------------------------------------
.../core/metadata/CarbonMetadata.java | 3 +-
.../ThriftWrapperSchemaConverterImpl.java | 8 +-
.../core/metadata/datatype/DataType.java | 44 +++++++-
.../core/metadata/encoder/Encoding.java | 20 ++++
.../core/metadata/schema/SchemaEvolution.java | 1 +
.../core/metadata/schema/table/CarbonTable.java | 96 ++++-------------
.../core/metadata/schema/table/TableInfo.java | 106 +++++++++++++++----
.../core/metadata/schema/table/TableSchema.java | 29 ++++-
.../core/metadata/schema/table/Writable.java | 31 ++++++
.../metadata/schema/table/WritableUtil.java | 45 ++++++++
.../schema/table/column/ColumnSchema.java | 84 ++++++++++++---
.../dictionary/client/DictionaryClientTest.java | 3 +-
...ncrementalColumnDictionaryGeneratorTest.java | 3 +-
.../ServerDictionaryGeneratorTest.java | 3 +-
.../generator/TableDictionaryGeneratorTest.java | 3 +-
.../core/metadata/CarbonMetadataTest.java | 6 +-
.../ThriftWrapperSchemaConverterImplTest.java | 9 +-
.../metadata/schema/table/CarbonTableTest.java | 3 +-
.../table/CarbonTableWithComplexTypesTest.java | 3 +-
.../carbondata/hadoop/CarbonInputFormat.java | 90 ++++++++--------
.../hadoop/ft/CarbonInputMapperTest.java | 2 +-
.../hadoop/test/util/StoreCreator.java | 1 -
.../testsuite/createTable/TestTableIdTest.scala | 75 -------------
.../apache/carbondata/spark/rdd/CarbonRDD.scala | 17 +++
.../carbondata/spark/rdd/CarbonScanRDD.scala | 14 +--
.../spark/rdd/DataManagementFunc.scala | 1 -
.../execution/command/carbonTableSchema.scala | 1 -
.../scala/org/apache/spark/sql/CarbonScan.scala | 4 +-
.../execution/command/carbonTableSchema.scala | 2 -
.../sql/CarbonDatasourceHadoopRelation.scala | 42 +++++---
.../spark/sql/CarbonDictionaryDecoder.scala | 15 ++-
.../execution/CarbonLateDecodeStrategy.scala | 19 ++--
.../sql/execution/command/IUDCommands.scala | 8 +-
.../execution/command/carbonTableSchema.scala | 2 -
.../processing/model/CarbonLoadModel.java | 7 --
.../carbondata/processing/StoreCreator.java | 1 -
36 files changed, 481 insertions(+), 320 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac5aee18/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java b/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java
index 079540f..75fe78b 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java
@@ -78,8 +78,7 @@ public final class CarbonMetadata {
CarbonTable carbonTable = tableInfoMap.get(convertToLowerCase(tableInfo.getTableUniqueName()));
if (null == carbonTable || carbonTable.getTableLastUpdatedTime() < tableInfo
.getLastUpdatedTime()) {
- carbonTable = new CarbonTable();
- carbonTable.loadCarbonTable(tableInfo);
+ carbonTable = CarbonTable.buildFromTableInfo(tableInfo);
tableInfoMap.put(convertToLowerCase(tableInfo.getTableUniqueName()), carbonTable);
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac5aee18/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
index 235a7ba..745f477 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
@@ -272,12 +272,8 @@ public class ThriftWrapperSchemaConverterImpl implements SchemaConverter {
org.apache.carbondata.format.TableSchema thriftFactTable =
fromWrapperToExternalTableSchema(wrapperTableInfo.getFactTable());
- List<org.apache.carbondata.format.TableSchema> thriftAggTables =
- new ArrayList<org.apache.carbondata.format.TableSchema>();
- for (TableSchema wrapperAggTableSchema : wrapperTableInfo.getAggregateTableList()) {
- thriftAggTables.add(fromWrapperToExternalTableSchema(wrapperAggTableSchema));
- }
- return new org.apache.carbondata.format.TableInfo(thriftFactTable, thriftAggTables);
+ return new org.apache.carbondata.format.TableInfo(thriftFactTable, new ArrayList<org.apache
+ .carbondata.format.TableSchema>());
}
/* (non-Javadoc)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac5aee18/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java
index 9371451..e97cce0 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java
@@ -46,8 +46,8 @@ public enum DataType {
// size of the value of this data type, negative value means variable length
private int sizeInBytes;
- DataType(int value ,String name, int sizeInBytes) {
- this.precedenceOrder = value;
+ DataType(int precedenceOrder, String name, int sizeInBytes) {
+ this.precedenceOrder = precedenceOrder;
this.name = name;
this.sizeInBytes = sizeInBytes;
}
@@ -74,4 +74,44 @@ public enum DataType {
}
return (int) (Math.log(getSizeInBytes()) / Math.log(2));
}
+
+ public static DataType valueOf(int ordinal) {
+ if (ordinal == STRING.ordinal()) {
+ return STRING;
+ } else if (ordinal == DATE.ordinal()) {
+ return DATE;
+ } else if (ordinal == TIMESTAMP.ordinal()) {
+ return TIMESTAMP;
+ } else if (ordinal == BOOLEAN.ordinal()) {
+ return BOOLEAN;
+ } else if (ordinal == SHORT.ordinal()) {
+ return SHORT;
+ } else if (ordinal == INT.ordinal()) {
+ return INT;
+ } else if (ordinal == FLOAT.ordinal()) {
+ return FLOAT;
+ } else if (ordinal == LONG.ordinal()) {
+ return LONG;
+ } else if (ordinal == DOUBLE.ordinal()) {
+ return DOUBLE;
+ } else if (ordinal == NULL.ordinal()) {
+ return NULL;
+ } else if (ordinal == DECIMAL.ordinal()) {
+ return DECIMAL;
+ } else if (ordinal == ARRAY.ordinal()) {
+ return ARRAY;
+ } else if (ordinal == STRUCT.ordinal()) {
+ return STRUCT;
+ } else if (ordinal == MAP.ordinal()) {
+ return MAP;
+ } else if (ordinal == BYTE.ordinal()) {
+ return BYTE;
+ } else if (ordinal == BYTE_ARRAY.ordinal()) {
+ return BYTE_ARRAY;
+ } else if (ordinal == SHORT_INT.ordinal()) {
+ return SHORT_INT;
+ } else {
+ throw new RuntimeException("create DataType with invalid ordinal: " + ordinal);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac5aee18/core/src/main/java/org/apache/carbondata/core/metadata/encoder/Encoding.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/encoder/Encoding.java b/core/src/main/java/org/apache/carbondata/core/metadata/encoder/Encoding.java
index fe24975..e3d7a9a 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/encoder/Encoding.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/encoder/Encoding.java
@@ -27,4 +27,24 @@ public enum Encoding {
BIT_PACKED,
DIRECT_DICTIONARY,
IMPLICIT;
+
+ public static Encoding valueOf(int ordinal) {
+ if (ordinal == DICTIONARY.ordinal()) {
+ return DICTIONARY;
+ } else if (ordinal == DELTA.ordinal()) {
+ return DELTA;
+ } else if (ordinal == RLE.ordinal()) {
+ return RLE;
+ } else if (ordinal == INVERTED_INDEX.ordinal()) {
+ return INVERTED_INDEX;
+ } else if (ordinal == BIT_PACKED.ordinal()) {
+ return BIT_PACKED;
+ } else if (ordinal == DIRECT_DICTIONARY.ordinal()) {
+ return DIRECT_DICTIONARY;
+ } else if (ordinal == IMPLICIT.ordinal()) {
+ return IMPLICIT;
+ } else {
+ throw new RuntimeException("create Encoding with invalid ordinal: " + ordinal);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac5aee18/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaEvolution.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaEvolution.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaEvolution.java
index 4bdbe8d..6960736 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaEvolution.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaEvolution.java
@@ -47,4 +47,5 @@ public class SchemaEvolution implements Serializable {
public void setSchemaEvolutionEntryList(List<SchemaEvolutionEntry> schemaEvolutionEntryList) {
this.schemaEvolutionEntryList = schemaEvolutionEntryList;
}
+
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac5aee18/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index cec8007..01b3022 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -20,8 +20,6 @@ package org.apache.carbondata.core.metadata.schema.table;
import java.io.Serializable;
import java.util.*;
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
@@ -40,15 +38,14 @@ import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
public class CarbonTable implements Serializable {
/**
- * serialization id
+ * the cached table info
*/
- private static final long serialVersionUID = 8696507171227156445L;
+ private TableInfo tableInfo;
/**
- * Attribute for Carbon table LOGGER
+ * serialization id
*/
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(CarbonTable.class.getName());
+ private static final long serialVersionUID = 8696507171227156445L;
/**
* Absolute table identifier
@@ -103,11 +100,6 @@ public class CarbonTable implements Serializable {
private String tableUniqueName;
/**
- * Aggregate tables name
- */
- private List<String> aggregateTablesName;
-
- /**
* metadata file path (check if it is really required )
*/
private String metaDataFilepath;
@@ -132,13 +124,12 @@ public class CarbonTable implements Serializable {
*/
private int numberOfNoDictSortColumns;
- public CarbonTable() {
+ private CarbonTable() {
this.tableDimensionsMap = new HashMap<String, List<CarbonDimension>>();
this.tableImplicitDimensionsMap = new HashMap<String, List<CarbonDimension>>();
this.tableMeasuresMap = new HashMap<String, List<CarbonMeasure>>();
this.tableBucketMap = new HashMap<>();
this.tablePartitionMap = new HashMap<>();
- this.aggregateTablesName = new ArrayList<String>();
this.createOrderColumn = new HashMap<String, List<CarbonColumn>>();
this.tablePrimitiveDimensionsMap = new HashMap<String, List<CarbonDimension>>();
}
@@ -146,39 +137,26 @@ public class CarbonTable implements Serializable {
/**
* @param tableInfo
*/
- public void loadCarbonTable(TableInfo tableInfo) {
- this.blockSize = getTableBlockSizeInMB(tableInfo);
- this.tableLastUpdatedTime = tableInfo.getLastUpdatedTime();
- this.tableUniqueName = tableInfo.getTableUniqueName();
- this.metaDataFilepath = tableInfo.getMetaDataFilepath();
- //setting unique table identifier
- CarbonTableIdentifier carbontableIdentifier =
- new CarbonTableIdentifier(tableInfo.getDatabaseName(),
- tableInfo.getFactTable().getTableName(), tableInfo.getFactTable().getTableId());
- this.absoluteTableIdentifier =
- new AbsoluteTableIdentifier(tableInfo.getStorePath(), carbontableIdentifier);
-
- fillDimensionsAndMeasuresForTables(tableInfo.getFactTable());
- fillCreateOrderColumn(tableInfo.getFactTable().getTableName());
- List<TableSchema> aggregateTableList = tableInfo.getAggregateTableList();
- for (TableSchema aggTable : aggregateTableList) {
- this.aggregateTablesName.add(aggTable.getTableName());
- fillDimensionsAndMeasuresForTables(aggTable);
- if (aggTable.getBucketingInfo() != null) {
- tableBucketMap.put(aggTable.getTableName(), aggTable.getBucketingInfo());
- }
- if (aggTable.getPartitionInfo() != null) {
- tablePartitionMap.put(aggTable.getTableName(), aggTable.getPartitionInfo());
- }
- }
+ public static CarbonTable buildFromTableInfo(TableInfo tableInfo) {
+ CarbonTable table = new CarbonTable();
+ table.tableInfo = tableInfo;
+ table.blockSize = tableInfo.getTableBlockSizeInMB();
+ table.tableLastUpdatedTime = tableInfo.getLastUpdatedTime();
+ table.tableUniqueName = tableInfo.getTableUniqueName();
+ table.metaDataFilepath = tableInfo.getMetaDataFilepath();
+ table.absoluteTableIdentifier = tableInfo.getOrCreateAbsoluteTableIdentifier();
+
+ table.fillDimensionsAndMeasuresForTables(tableInfo.getFactTable());
+ table.fillCreateOrderColumn(tableInfo.getFactTable().getTableName());
if (tableInfo.getFactTable().getBucketingInfo() != null) {
- tableBucketMap.put(tableInfo.getFactTable().getTableName(),
+ table.tableBucketMap.put(tableInfo.getFactTable().getTableName(),
tableInfo.getFactTable().getBucketingInfo());
}
if (tableInfo.getFactTable().getPartitionInfo() != null) {
- tablePartitionMap.put(tableInfo.getFactTable().getTableName(),
+ table.tablePartitionMap.put(tableInfo.getFactTable().getTableName(),
tableInfo.getFactTable().getPartitionInfo());
}
+ return table;
}
/**
@@ -208,29 +186,6 @@ public class CarbonTable implements Serializable {
this.createOrderColumn.put(tableName, columns);
}
- /**
- * This method will return the table size. Default table block size will be considered
- * in case not specified by the user
- *
- * @param tableInfo
- * @return
- */
- private int getTableBlockSizeInMB(TableInfo tableInfo) {
- String tableBlockSize = null;
- // In case of old store there will not be any map for table properties so table properties
- // will be null
- Map<String, String> tableProperties = tableInfo.getFactTable().getTableProperties();
- if (null != tableProperties) {
- tableBlockSize = tableProperties.get(CarbonCommonConstants.TABLE_BLOCKSIZE);
- }
- if (null == tableBlockSize) {
- tableBlockSize = CarbonCommonConstants.BLOCK_SIZE_DEFAULT_VAL;
- LOGGER.info("Table block size not specified for " + tableInfo.getTableUniqueName()
- + ". Therefore considering the default value "
- + CarbonCommonConstants.BLOCK_SIZE_DEFAULT_VAL + " MB");
- }
- return Integer.parseInt(tableBlockSize);
- }
/**
* Fill allDimensions and allMeasures for carbon table
@@ -410,13 +365,6 @@ public class CarbonTable implements Serializable {
}
/**
- * @return list of aggregate TablesName
- */
- public List<String> getAggregateTablesName() {
- return aggregateTablesName;
- }
-
- /**
* @return the tableLastUpdatedTime
*/
public long getTableLastUpdatedTime() {
@@ -699,8 +647,6 @@ public class CarbonTable implements Serializable {
return sort_columsList;
}
-
-
public int getNumberOfSortColumns() {
return numberOfSortColumns;
}
@@ -708,4 +654,8 @@ public class CarbonTable implements Serializable {
public int getNumberOfNoDictSortColumns() {
return numberOfNoDictSortColumns;
}
+
+ public TableInfo getTableInfo() {
+ return tableInfo;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac5aee18/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
index 328928d..78cd97b 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
@@ -16,17 +16,30 @@
*/
package org.apache.carbondata.core.metadata.schema.table;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.Map;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
/**
* Store the information about the table.
* it stores the fact table as well as aggregate table present in the schema
*/
-public class TableInfo implements Serializable {
+public class TableInfo implements Serializable, Writable {
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(TableInfo.class.getName());
/**
* serialization version
@@ -49,11 +62,6 @@ public class TableInfo implements Serializable {
private TableSchema factTable;
/**
- * list of aggregate table
- */
- private List<TableSchema> aggregateTableList;
-
- /**
* last updated time to update the table if any changes
*/
private long lastUpdatedTime;
@@ -68,8 +76,10 @@ public class TableInfo implements Serializable {
*/
private String storePath;
+ // this idenifier is a lazy field which will be created when it is used first time
+ private AbsoluteTableIdentifier identifier;
+
public TableInfo() {
- aggregateTableList = new ArrayList<TableSchema>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
}
/**
@@ -87,20 +97,6 @@ public class TableInfo implements Serializable {
}
/**
- * @return the aggregateTableList
- */
- public List<TableSchema> getAggregateTableList() {
- return aggregateTableList;
- }
-
- /**
- * @param aggregateTableList the aggregateTableList to set
- */
- public void setAggregateTableList(List<TableSchema> aggregateTableList) {
- this.aggregateTableList = aggregateTableList;
- }
-
- /**
* @return the databaseName
*/
public String getDatabaseName() {
@@ -206,4 +202,68 @@ public class TableInfo implements Serializable {
}
return true;
}
+
+ /**
+ * This method will return the table size. Default table block size will be considered
+ * in case not specified by the user
+ */
+ int getTableBlockSizeInMB() {
+ String tableBlockSize = null;
+ // In case of old store there will not be any map for table properties so table properties
+ // will be null
+ Map<String, String> tableProperties = getFactTable().getTableProperties();
+ if (null != tableProperties) {
+ tableBlockSize = tableProperties.get(CarbonCommonConstants.TABLE_BLOCKSIZE);
+ }
+ if (null == tableBlockSize) {
+ tableBlockSize = CarbonCommonConstants.BLOCK_SIZE_DEFAULT_VAL;
+ LOGGER.info("Table block size not specified for " + getTableUniqueName()
+ + ". Therefore considering the default value "
+ + CarbonCommonConstants.BLOCK_SIZE_DEFAULT_VAL + " MB");
+ }
+ return Integer.parseInt(tableBlockSize);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeUTF(databaseName);
+ out.writeUTF(tableUniqueName);
+ factTable.write(out);
+ out.writeLong(lastUpdatedTime);
+ out.writeUTF(metaDataFilepath);
+ out.writeUTF(storePath);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ this.databaseName = in.readUTF();
+ this.tableUniqueName = in.readUTF();
+ this.factTable = new TableSchema();
+ this.factTable.readFields(in);
+ this.lastUpdatedTime = in.readLong();
+ this.metaDataFilepath = in.readUTF();
+ this.storePath = in.readUTF();
+ }
+
+ public AbsoluteTableIdentifier getOrCreateAbsoluteTableIdentifier() {
+ if (identifier == null) {
+ CarbonTableIdentifier carbontableIdentifier =
+ new CarbonTableIdentifier(databaseName, factTable.getTableName(), factTable.getTableId());
+ identifier = new AbsoluteTableIdentifier(storePath, carbontableIdentifier);
+ }
+ return identifier;
+ }
+
+ public byte[] serialize() throws IOException {
+ ByteArrayOutputStream bao = new ByteArrayOutputStream();
+ this.write(new DataOutputStream(bao));
+ return bao.toByteArray();
+ }
+
+ public static TableInfo deserialize(byte[] bytes) throws IOException {
+ TableInfo tableInfo = new TableInfo();
+ DataInputStream in = new DataInputStream(new ByteArrayInputStream(bytes));
+ tableInfo.readFields(in);
+ return tableInfo;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac5aee18/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java
index f9d848e..a396d19 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java
@@ -16,6 +16,9 @@
*/
package org.apache.carbondata.core.metadata.schema.table;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
@@ -30,7 +33,7 @@ import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
/**
* Persisting the table information
*/
-public class TableSchema implements Serializable {
+public class TableSchema implements Serializable, Writable {
/**
* serialization version
@@ -198,4 +201,28 @@ public class TableSchema implements Serializable {
public void setPartitionInfo(PartitionInfo partitionInfo) {
this.partitionInfo = partitionInfo;
}
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeUTF(tableId);
+ out.writeUTF(tableName);
+ out.writeInt(listOfColumns.size());
+ for (ColumnSchema column : listOfColumns) {
+ column.write(out);
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ this.tableId = in.readUTF();
+ this.tableName = in.readUTF();
+ int listSize = in.readInt();
+ this.listOfColumns = new ArrayList<>(listSize);
+ for (int i = 0; i < listSize; i++) {
+ ColumnSchema schema = new ColumnSchema();
+ schema.readFields(in);
+ this.listOfColumns.add(schema);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac5aee18/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/Writable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/Writable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/Writable.java
new file mode 100644
index 0000000..94a3b78
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/Writable.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.metadata.schema.table;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+// The same interface as hadoop.io.Writable. We port the interface here to avoid hadoop package
+// dependency
+public interface Writable {
+
+ void write(DataOutput out) throws IOException;
+
+ void readFields(DataInput in) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac5aee18/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/WritableUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/WritableUtil.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/WritableUtil.java
new file mode 100644
index 0000000..dbcb1a7
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/WritableUtil.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.metadata.schema.table;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class WritableUtil {
+
+ public static void writeByteArray(DataOutput out, byte[] bytes) throws IOException {
+ if (bytes == null) {
+ out.writeInt(-1);
+ } else {
+ out.writeInt(bytes.length);
+ out.write(bytes);
+ }
+ }
+
+ public static byte[] readByteArray(DataInput in) throws IOException {
+ int length = in.readInt();
+ if (length == -1) {
+ return null;
+ } else {
+ byte[] b = new byte[length];
+ in.readFully(b);
+ return b;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac5aee18/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java
index f5b8116..3680d53 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java
@@ -16,17 +16,24 @@
*/
package org.apache.carbondata.core.metadata.schema.table.column;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.metadata.schema.table.Writable;
+import org.apache.carbondata.core.metadata.schema.table.WritableUtil;
/**
* Store the information about the column meta data present the table
*/
-public class ColumnSchema implements Serializable {
+public class ColumnSchema implements Serializable, Writable {
/**
* serialization version
@@ -37,6 +44,7 @@ public class ColumnSchema implements Serializable {
* dataType
*/
private DataType dataType;
+
/**
* Name of the column. If it is a complex data type, we follow a naming rule
* grand_parent_column.parent_column.child_column
@@ -360,17 +368,6 @@ public class ColumnSchema implements Serializable {
}
/**
- * @param property
- * @return
- */
- public String getColumnProperty(String property) {
- if (null != columnProperties) {
- return columnProperties.get(property);
- }
- return null;
- }
-
- /**
* return columnproperties
*/
public Map<String, String> getColumnProperties() {
@@ -421,4 +418,67 @@ public class ColumnSchema implements Serializable {
public void setSortColumn(boolean sortColumn) {
isSortColumn = sortColumn;
}
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeShort(dataType.ordinal());
+ out.writeUTF(columnName);
+ out.writeUTF(columnUniqueId);
+ out.writeUTF(columnReferenceId);
+ if (encodingList == null) {
+ out.writeShort(0);
+ } else {
+ out.writeShort(encodingList.size());
+ for (Encoding encoding : encodingList) {
+ out.writeShort(encoding.ordinal());
+ }
+ }
+ out.writeBoolean(isDimensionColumn);
+ out.writeInt(scale);
+ out.writeInt(precision);
+ out.writeInt(schemaOrdinal);
+ out.writeInt(numberOfChild);
+ WritableUtil.writeByteArray(out, defaultValue);
+ if (columnProperties == null) {
+ out.writeShort(0);
+ } else {
+ out.writeShort(columnProperties.size());
+ for (Map.Entry<String, String> entry : columnProperties.entrySet()) {
+ out.writeUTF(entry.getKey());
+ out.writeUTF(entry.getValue());
+ }
+ }
+ out.writeBoolean(invisible);
+ out.writeBoolean(isSortColumn);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ int ordinal = in.readShort();
+ this.dataType = DataType.valueOf(ordinal);
+ this.columnName = in.readUTF();
+ this.columnUniqueId = in.readUTF();
+ this.columnReferenceId = in.readUTF();
+ int encodingListSize = in.readShort();
+ this.encodingList = new ArrayList<>(encodingListSize);
+ for (int i = 0; i < encodingListSize; i++) {
+ ordinal = in.readShort();
+ encodingList.add(Encoding.valueOf(ordinal));
+ }
+ this.isDimensionColumn = in.readBoolean();
+ this.scale = in.readInt();
+ this.precision = in.readInt();
+ this.schemaOrdinal = in.readInt();
+ this.numberOfChild = in.readInt();
+ this.defaultValue = WritableUtil.readByteArray(in);
+ int mapSize = in.readShort();
+ this.columnProperties = new HashMap<>(mapSize);
+ for (int i = 0; i < mapSize; i++) {
+ String key = in.readUTF();
+ String value = in.readUTF();
+ this.columnProperties.put(key, value);
+ }
+ this.invisible = in.readBoolean();
+ this.isSortColumn = in.readBoolean();
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac5aee18/core/src/test/java/org/apache/carbondata/core/dictionary/client/DictionaryClientTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/dictionary/client/DictionaryClientTest.java b/core/src/test/java/org/apache/carbondata/core/dictionary/client/DictionaryClientTest.java
index 60d3c26..dc3b232 100644
--- a/core/src/test/java/org/apache/carbondata/core/dictionary/client/DictionaryClientTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/dictionary/client/DictionaryClientTest.java
@@ -88,8 +88,7 @@ public class DictionaryClientTest {
tableInfo.setDatabaseName("test");
storePath = System.getProperty("java.io.tmpdir") + "/tmp";
tableInfo.setStorePath(storePath);
- CarbonTable carbonTable = new CarbonTable();
- carbonTable.loadCarbonTable(tableInfo);
+ CarbonTable carbonTable = CarbonTable.buildFromTableInfo(tableInfo);
// Add the created table to metadata
metadata.addCarbonTable(carbonTable);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac5aee18/core/src/test/java/org/apache/carbondata/core/dictionary/generator/IncrementalColumnDictionaryGeneratorTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/dictionary/generator/IncrementalColumnDictionaryGeneratorTest.java b/core/src/test/java/org/apache/carbondata/core/dictionary/generator/IncrementalColumnDictionaryGeneratorTest.java
index 49db6ce..0bac01a 100644
--- a/core/src/test/java/org/apache/carbondata/core/dictionary/generator/IncrementalColumnDictionaryGeneratorTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/dictionary/generator/IncrementalColumnDictionaryGeneratorTest.java
@@ -166,8 +166,7 @@ public class IncrementalColumnDictionaryGeneratorTest {
System.out.print(dictPath.mkdirs());
tableInfo.setStorePath(storePath);
- CarbonTable carbonTable = new CarbonTable();
- carbonTable.loadCarbonTable(tableInfo);
+ CarbonTable carbonTable = CarbonTable.buildFromTableInfo(tableInfo);
// Add the table to metadata
metadata.addCarbonTable(carbonTable);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac5aee18/core/src/test/java/org/apache/carbondata/core/dictionary/generator/ServerDictionaryGeneratorTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/dictionary/generator/ServerDictionaryGeneratorTest.java b/core/src/test/java/org/apache/carbondata/core/dictionary/generator/ServerDictionaryGeneratorTest.java
index b66331f..d8df99a 100644
--- a/core/src/test/java/org/apache/carbondata/core/dictionary/generator/ServerDictionaryGeneratorTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/dictionary/generator/ServerDictionaryGeneratorTest.java
@@ -83,8 +83,7 @@ public class ServerDictionaryGeneratorTest {
tableInfo.setDatabaseName("test");
storePath = System.getProperty("java.io.tmpdir") + "/tmp";
tableInfo.setStorePath(storePath);
- CarbonTable carbonTable = new CarbonTable();
- carbonTable.loadCarbonTable(tableInfo);
+ CarbonTable carbonTable = CarbonTable.buildFromTableInfo(tableInfo);
// Add the created table to metadata
metadata.addCarbonTable(carbonTable);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac5aee18/core/src/test/java/org/apache/carbondata/core/dictionary/generator/TableDictionaryGeneratorTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/dictionary/generator/TableDictionaryGeneratorTest.java b/core/src/test/java/org/apache/carbondata/core/dictionary/generator/TableDictionaryGeneratorTest.java
index 0cb47c4..8a68b72 100644
--- a/core/src/test/java/org/apache/carbondata/core/dictionary/generator/TableDictionaryGeneratorTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/dictionary/generator/TableDictionaryGeneratorTest.java
@@ -83,8 +83,7 @@ public class TableDictionaryGeneratorTest {
tableInfo.setDatabaseName("test");
storePath = System.getProperty("java.io.tmpdir") + "/tmp";
tableInfo.setStorePath(storePath);
- CarbonTable carbonTable = new CarbonTable();
- carbonTable.loadCarbonTable(tableInfo);
+ CarbonTable carbonTable = CarbonTable.buildFromTableInfo(tableInfo);
// Add the created table to metadata
metadata.addCarbonTable(carbonTable);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac5aee18/core/src/test/java/org/apache/carbondata/core/metadata/CarbonMetadataTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/metadata/CarbonMetadataTest.java b/core/src/test/java/org/apache/carbondata/core/metadata/CarbonMetadataTest.java
index 56d14c2..3af0bdb 100644
--- a/core/src/test/java/org/apache/carbondata/core/metadata/CarbonMetadataTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/metadata/CarbonMetadataTest.java
@@ -160,7 +160,7 @@ public class CarbonMetadataTest {
}
@Test public void testGetCarbonDimensionBasedOnColIdentifier() {
- CarbonTable carbonTable = new CarbonTable();
+ CarbonTable carbonTable = CarbonTable.buildFromTableInfo(getTableInfo(1000L));
String columnIdentifier = "1";
final List<CarbonDimension> carbonDimensions = new ArrayList();
ColumnSchema colSchema1 = new ColumnSchema();
@@ -186,7 +186,7 @@ public class CarbonMetadataTest {
@Test
public void testGetCarbonDimensionBasedOnColIdentifierWhenChildDimensionColumnEqualsColumnIdentifier() {
- CarbonTable carbonTable = new CarbonTable();
+ CarbonTable carbonTable = CarbonTable.buildFromTableInfo(getTableInfo(1000L));
String columnIdentifier = "9";
final List<CarbonDimension> carbonDimensions = new ArrayList();
ColumnSchema colSchema1 = new ColumnSchema();
@@ -226,7 +226,7 @@ public class CarbonMetadataTest {
}
@Test public void testGetCarbonDimensionBasedOnColIdentifierNullCase() {
- CarbonTable carbonTable = new CarbonTable();
+ CarbonTable carbonTable = CarbonTable.buildFromTableInfo(getTableInfo(1000L));
String columnIdentifier = "3";
final List<CarbonDimension> carbonDimensions = new ArrayList();
ColumnSchema colSchema1 = new ColumnSchema();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac5aee18/core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java b/core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java
index e8851e0..3961d9c 100644
--- a/core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java
@@ -1500,10 +1500,6 @@ public class ThriftWrapperSchemaConverterImplTest {
@Mock public TableSchema getFactTable() {
return wrapperTableSchema;
}
-
- @Mock public List<TableSchema> getAggregateTableList() {
- return tableSchemas;
- }
};
new MockUp<TableSchema>() {
@@ -1529,12 +1525,11 @@ public class ThriftWrapperSchemaConverterImplTest {
};
org.apache.carbondata.format.TableSchema thriftFactTable =
new org.apache.carbondata.format.TableSchema("tableId", thriftColumnSchemas, schemaEvol);
- List<org.apache.carbondata.format.TableSchema> thriftAggTables = new ArrayList<>();
- thriftAggTables.add(thriftFactTable);
org.apache.carbondata.format.TableInfo actualResult = thriftWrapperSchemaConverter
.fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName);
org.apache.carbondata.format.TableInfo expectedResult =
- new org.apache.carbondata.format.TableInfo(thriftFactTable, thriftAggTables);
+ new org.apache.carbondata.format.TableInfo(thriftFactTable, new ArrayList<org.apache
+ .carbondata.format.TableSchema>());
assertEquals(expectedResult, actualResult);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac5aee18/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableTest.java b/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableTest.java
index 9ed8b56..f5ffe51 100644
--- a/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableTest.java
@@ -36,8 +36,7 @@ public class CarbonTableTest extends TestCase {
private CarbonTable carbonTable;
@BeforeClass public void setUp() {
- carbonTable = new CarbonTable();
- carbonTable.loadCarbonTable(getTableInfo(1000L));
+ carbonTable = CarbonTable.buildFromTableInfo(getTableInfo(1000L));
}
@AfterClass public void tearDown() {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac5aee18/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableWithComplexTypesTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableWithComplexTypesTest.java b/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableWithComplexTypesTest.java
index 9c3ab6a..69cab49 100644
--- a/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableWithComplexTypesTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableWithComplexTypesTest.java
@@ -35,8 +35,7 @@ public class CarbonTableWithComplexTypesTest extends TestCase {
private CarbonTable carbonTable;
@BeforeClass public void setUp() {
- carbonTable = new CarbonTable();
- carbonTable.loadCarbonTable(getTableInfo(1000L));
+ carbonTable = CarbonTable.buildFromTableInfo(getTableInfo(1000L));
}
@AfterClass public void tearDown() {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac5aee18/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
index 00e420c..6d14424 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
@@ -39,6 +39,7 @@ import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
import org.apache.carbondata.core.metadata.schema.PartitionInfo;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
import org.apache.carbondata.core.mutate.UpdateVO;
import org.apache.carbondata.core.mutate.data.BlockMappingVO;
import org.apache.carbondata.core.scan.expression.Expression;
@@ -99,51 +100,50 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
private static final String FILTER_PREDICATE =
"mapreduce.input.carboninputformat.filter.predicate";
private static final String COLUMN_PROJECTION = "mapreduce.input.carboninputformat.projection";
- private static final String CARBON_TABLE = "mapreduce.input.carboninputformat.table";
+ private static final String TABLE_INFO = "mapreduce.input.carboninputformat.tableinfo";
private static final String CARBON_READ_SUPPORT = "mapreduce.input.carboninputformat.readsupport";
+ // a cache for carbon table, it will be used in task side
+ private CarbonTable carbonTable;
+
/**
- * It is optional, if user does not set then it reads from store
- *
- * @param configuration
- * @param carbonTable
- * @throws IOException
+ * Set the `tableInfo` in `configuration`
*/
- public static void setCarbonTable(Configuration configuration, CarbonTable carbonTable)
+ public static void setTableInfo(Configuration configuration, TableInfo tableInfo)
throws IOException {
- if (null != carbonTable) {
- configuration.set(CARBON_TABLE, ObjectSerializationUtil.convertObjectToString(carbonTable));
+ if (null != tableInfo) {
+ configuration.set(TABLE_INFO, ObjectSerializationUtil.convertObjectToString(tableInfo));
}
}
- public static CarbonTable getCarbonTable(Configuration configuration) throws IOException {
- String carbonTableStr = configuration.get(CARBON_TABLE);
- if (carbonTableStr == null) {
- populateCarbonTable(configuration);
- // read it from schema file in the store
- carbonTableStr = configuration.get(CARBON_TABLE);
- return (CarbonTable) ObjectSerializationUtil.convertStringToObject(carbonTableStr);
- }
- return (CarbonTable) ObjectSerializationUtil.convertStringToObject(carbonTableStr);
+ /**
+ * Get TableInfo object from `configuration`
+ */
+ private TableInfo getTableInfo(Configuration configuration) throws IOException {
+ String tableInfoStr = configuration.get(TABLE_INFO);
+ return (TableInfo) ObjectSerializationUtil.convertStringToObject(tableInfoStr);
}
/**
- * this method will read the schema from the physical file and populate into CARBON_TABLE
- * @param configuration
- * @throws IOException
+ * Get the cached CarbonTable or create it by TableInfo in `configuration`
*/
- private static void populateCarbonTable(Configuration configuration) throws IOException {
- String dirs = configuration.get(INPUT_DIR, "");
- String[] inputPaths = StringUtils.split(dirs);
- if (inputPaths.length == 0) {
- throw new InvalidPathException("No input paths specified in job");
+ private CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IOException {
+ if (carbonTable == null) {
+ // carbon table should be created either from deserialized table info (schema saved in
+ // hive metastore) or by reading schema in HDFS (schema saved in HDFS)
+ TableInfo tableInfo = getTableInfo(configuration);
+ CarbonTable carbonTable;
+ if (tableInfo != null) {
+ carbonTable = CarbonTable.buildFromTableInfo(tableInfo);
+ } else {
+ carbonTable = SchemaReader.readCarbonTableFromStore(
+ getAbsoluteTableIdentifier(configuration));
+ }
+ this.carbonTable = carbonTable;
+ return carbonTable;
+ } else {
+ return this.carbonTable;
}
- AbsoluteTableIdentifier absoluteTableIdentifier =
- AbsoluteTableIdentifier.fromTablePath(inputPaths[0]);
- // read the schema file to get the absoluteTableIdentifier having the correct table id
- // persisted in the schema
- CarbonTable carbonTable = SchemaReader.readCarbonTableFromStore(absoluteTableIdentifier);
- setCarbonTable(configuration, carbonTable);
}
public static void setTablePath(Configuration configuration, String tablePath)
@@ -210,13 +210,17 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
* Set list of files to access
*/
public static void setFilesToAccess(Configuration configuration, List<String> validFiles) {
- configuration
- .set(CarbonInputFormat.INPUT_FILES, CarbonUtil.getSegmentString(validFiles));
+ configuration.set(CarbonInputFormat.INPUT_FILES, CarbonUtil.getSegmentString(validFiles));
}
- private static AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configuration)
+ private AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configuration)
throws IOException {
- return getCarbonTable(configuration).getAbsoluteTableIdentifier();
+ String dirs = configuration.get(INPUT_DIR, "");
+ String[] inputPaths = StringUtils.split(dirs);
+ if (inputPaths.length == 0) {
+ throw new InvalidPathException("No input paths specified in job");
+ }
+ return AbsoluteTableIdentifier.fromTablePath(inputPaths[0]);
}
/**
@@ -263,12 +267,7 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
// process and resolve the expression
Expression filter = getFilterPredicates(job.getConfiguration());
- CarbonTable carbonTable = getCarbonTable(job.getConfiguration());
- // this will be null in case of corrupt schema file.
- if (null == carbonTable) {
- throw new IOException("Missing/Corrupt schema file for table.");
- }
-
+ CarbonTable carbonTable = getOrCreateCarbonTable(job.getConfiguration());
CarbonInputFormatUtil.processFilterExpression(filter, carbonTable);
BitSet matchedPartitions = null;
PartitionInfo partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName());
@@ -358,9 +357,8 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
Boolean isIUDTable = false;
AbsoluteTableIdentifier absoluteTableIdentifier =
- getCarbonTable(job.getConfiguration()).getAbsoluteTableIdentifier();
- SegmentUpdateStatusManager updateStatusManager =
- new SegmentUpdateStatusManager(absoluteTableIdentifier);
+ getCarbonTable(job.getConfiguration()).getAbsoluteTableIdentifier();
+ SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(identifier);
isIUDTable = (updateStatusManager.getUpdateStatusDetails().length != 0);
@@ -726,7 +724,7 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
public QueryModel getQueryModel(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
throws IOException {
Configuration configuration = taskAttemptContext.getConfiguration();
- CarbonTable carbonTable = getCarbonTable(configuration);
+ CarbonTable carbonTable = getOrCreateCarbonTable(configuration);
// getting the table absoluteTableIdentifier from the carbonTable
// to avoid unnecessary deserialization
AbsoluteTableIdentifier identifier = carbonTable.getAbsoluteTableIdentifier();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac5aee18/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonInputMapperTest.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonInputMapperTest.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonInputMapperTest.java
index 9aa1188..86a3326 100644
--- a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonInputMapperTest.java
+++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonInputMapperTest.java
@@ -74,8 +74,8 @@ public class CarbonInputMapperTest extends TestCase {
Assert.assertEquals("Count lines are not matching", 1000, countTheLines(outPath));
Assert.assertEquals("Column count are not matching", 7, countTheColumns(outPath));
} catch (Exception e) {
- Assert.assertTrue("failed", false);
e.printStackTrace();
+ Assert.assertTrue("failed", false);
throw e;
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac5aee18/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
index 88c182e..8cad313 100644
--- a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
+++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
@@ -270,7 +270,6 @@ public class StoreCreator {
+ absoluteTableIdentifier.getCarbonTableIdentifier().getTableName());
tableInfo.setLastUpdatedTime(System.currentTimeMillis());
tableInfo.setFactTable(tableSchema);
- tableInfo.setAggregateTableList(new ArrayList<TableSchema>());
CarbonTablePath carbonTablePath = CarbonStorePath
.getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
absoluteTableIdentifier.getCarbonTableIdentifier());
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac5aee18/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestTableIdTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestTableIdTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestTableIdTest.scala
deleted file mode 100644
index a6790c8..0000000
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestTableIdTest.scala
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.testsuite.createTable
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.mapreduce.Job
-import org.junit.Assert
-import org.scalatest.BeforeAndAfterAll
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.hadoop.CarbonInputFormat
-import org.apache.spark.sql.test.util.QueryTest
-
-/**
- * test functionality related the case change for database name
- */
-class TestTableIdTest extends QueryTest with BeforeAndAfterAll {
-
- override def beforeAll: Unit = {
- sql("drop table if exists carbontable")
- }
-
- def validateTableId: Unit = {
- val carbonInputFormat: CarbonInputFormat[Array[Object]] = new CarbonInputFormat[Array[Object]]
- val jobConf: JobConf = new JobConf(new Configuration)
- val job: Job = Job.getInstance(jobConf)
- val storePath: String = storeLocation.replaceAll("\\\\", "/")
- job.getConfiguration
- .set("mapreduce.input.fileinputformat.inputdir",
- storePath + "/default/carbontable")
- val carbonTable: CarbonTable = CarbonInputFormat.getCarbonTable(job.getConfiguration)
- val getAbsoluteTableIdentifier = classOf[CarbonInputFormat[Array[Object]]]
- .getDeclaredMethod("getAbsoluteTableIdentifier", classOf[Configuration])
- getAbsoluteTableIdentifier.setAccessible(true)
- val absoluteTableIdentifier: AbsoluteTableIdentifier = getAbsoluteTableIdentifier
- .invoke(carbonInputFormat, job.getConfiguration).asInstanceOf[AbsoluteTableIdentifier]
-
- Assert
- .assertEquals(carbonTable.getCarbonTableIdentifier.getTableId,
- absoluteTableIdentifier.getCarbonTableIdentifier.getTableId)
- }
-
- test("test create table with database case name change") {
-
- try {
- // table creation should be successful
- sql("create table carbontable(a int, b string)stored by 'carbondata'")
- assert(true)
- } catch {
- case ex: Exception =>
- assert(false)
- }
- validateTableId
- }
-
- override def afterAll {
- sql("drop table if exists carbontable")
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac5aee18/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
index 48e97ed..c908c08 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
@@ -17,11 +17,14 @@
package org.apache.carbondata.spark.rdd
+import java.io.{ByteArrayInputStream, DataInputStream}
+
import scala.reflect.ClassTag
import org.apache.spark.{Dependency, OneToOneDependency, Partition, SparkContext, TaskContext}
import org.apache.spark.rdd.RDD
+import org.apache.carbondata.core.metadata.schema.table.TableInfo
import org.apache.carbondata.core.util.{CarbonSessionInfo, CarbonTaskInfo, SessionParams, ThreadLocalSessionInfo, ThreadLocalTaskInfo}
/**
@@ -47,3 +50,17 @@ abstract class CarbonRDD[T: ClassTag](@transient sc: SparkContext,
internalCompute(split, context)
}
}
+
+/**
+ * This RDD contains TableInfo object which is serialized and deserialized in driver and executor
+ */
+abstract class CarbonRDDWithTableInfo[T: ClassTag](
+ @transient sc: SparkContext,
+ @transient private var deps: Seq[Dependency[_]],
+ serializedTableInfo: Array[Byte]) extends CarbonRDD[T](sc, deps) {
+
+ def this(@transient oneParent: RDD[_], serializedTableInfo: Array[Byte]) =
+ this (oneParent.context, List(new OneToOneDependency(oneParent)), serializedTableInfo)
+
+ def getTableInfo: TableInfo = TableInfo.deserialize(serializedTableInfo)
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac5aee18/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 3868342..85c4bc4 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -26,7 +26,6 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
import org.apache.spark._
-import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.hive.DistributionUtil
@@ -34,7 +33,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.block.Distributable
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.metadata.schema.table.TableInfo
import org.apache.carbondata.core.scan.expression.Expression
import org.apache.carbondata.core.scan.model.QueryModel
import org.apache.carbondata.core.stats.{QueryStatistic, QueryStatisticsConstants, QueryStatisticsRecorder}
@@ -53,8 +52,9 @@ class CarbonScanRDD(
columnProjection: CarbonProjection,
filterExpression: Expression,
identifier: AbsoluteTableIdentifier,
- @transient carbonTable: CarbonTable)
- extends CarbonRDD[InternalRow](sc, Nil) {
+ serializedTableInfo: Array[Byte],
+ @transient tableInfo: TableInfo)
+ extends CarbonRDDWithTableInfo[InternalRow](sc, Nil, serializedTableInfo) {
private val queryId = sparkContext.getConf.get("queryId", System.nanoTime() + "")
private val jobTrackerId: String = {
@@ -65,7 +65,7 @@ class CarbonScanRDD(
private val readSupport = SparkReadSupport.readSupportClass
- private val bucketedTable = carbonTable.getBucketingInfo(carbonTable.getFactTableName)
+ private val bucketedTable = tableInfo.getFactTable.getBucketingInfo
@transient private val jobId = new JobID(jobTrackerId, id)
@transient val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
@@ -174,6 +174,7 @@ class CarbonScanRDD(
}
override def internalCompute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
+
val carbonPropertiesFilePath = System.getProperty("carbon.properties.filepath", null)
if (null == carbonPropertiesFilePath) {
System.setProperty("carbon.properties.filepath",
@@ -246,12 +247,13 @@ class CarbonScanRDD(
}
private def prepareInputFormatForDriver(conf: Configuration): CarbonInputFormat[Object] = {
- CarbonInputFormat.setCarbonTable(conf, carbonTable)
+ CarbonInputFormat.setTableInfo(conf, tableInfo)
createInputFormat(conf)
}
private def prepareInputFormatForExecutor(conf: Configuration): CarbonInputFormat[Object] = {
CarbonInputFormat.setCarbonReadSupport(conf, readSupport)
+ CarbonInputFormat.setTableInfo(conf, getTableInfo)
createInputFormat(conf)
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac5aee18/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
index 664cbae..954303f 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
@@ -266,7 +266,6 @@ object DataManagementFunc {
def prepareCarbonLoadModel(storePath: String,
table: CarbonTable,
newCarbonLoadModel: CarbonLoadModel): Unit = {
- newCarbonLoadModel.setAggTables(table.getAggregateTablesName.asScala.toArray)
newCarbonLoadModel.setTableName(table.getFactTableName)
val dataLoadSchema = new CarbonDataLoadSchema(table)
// Need to fill dimension relation
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac5aee18/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index ee77f35..6174f7c 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -526,7 +526,6 @@ class TableNewProcessor(cm: TableModel) {
tableInfo.setTableUniqueName(cm.databaseName + "_" + cm.tableName)
tableInfo.setLastUpdatedTime(System.currentTimeMillis())
tableInfo.setFactTable(tableSchema)
- tableInfo.setAggregateTableList(new util.ArrayList[TableSchema]())
tableInfo
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac5aee18/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala
index 17d6065..3bd92e9 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala
@@ -122,12 +122,14 @@ case class CarbonScan(
columnProjection.foreach { attr =>
projection.addColumn(attr.name)
}
+
new CarbonScanRDD(
ocRaw.sparkContext,
projection,
buildCarbonPlan.getFilterExpression,
carbonTable.getAbsoluteTableIdentifier,
- carbonTable
+ carbonTable.getTableInfo.serialize(),
+ carbonTable.getTableInfo
)
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac5aee18/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 44d5efb..da6dd98 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -128,7 +128,6 @@ private[sql] case class AlterTableCompaction(alterTableModel: AlterTableModel) e
val table = relation.tableMeta.carbonTable
- carbonLoadModel.setAggTables(table.getAggregateTablesName.asScala.toArray)
carbonLoadModel.setTableName(table.getFactTableName)
val dataLoadSchema = new CarbonDataLoadSchema(table)
// Need to fill dimension relation
@@ -404,7 +403,6 @@ case class LoadTable(
carbonLoadModel.setStorePath(relation.tableMeta.storePath)
val table = relation.tableMeta.carbonTable
- carbonLoadModel.setAggTables(table.getAggregateTablesName.asScala.toArray)
carbonLoadModel.setTableName(table.getFactTableName)
val dataLoadSchema = new CarbonDataLoadSchema(table)
// Need to fill dimension relation
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac5aee18/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index d28044f..d1baf79 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql
+import java.io.{ByteArrayOutputStream, DataOutputStream}
+
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.rdd.RDD
@@ -28,15 +30,13 @@ import org.apache.spark.sql.types.StructType
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.scan.expression.Expression
import org.apache.carbondata.core.scan.expression.logical.AndExpression
-import org.apache.carbondata.core.util.{CarbonSessionInfo, SessionParams, ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util.{CarbonSessionInfo, ThreadLocalSessionInfo}
import org.apache.carbondata.hadoop.CarbonProjection
-import org.apache.carbondata.hadoop.util.SchemaReader
-import org.apache.carbondata.processing.merger.TableMeta
import org.apache.carbondata.spark.CarbonFilters
import org.apache.carbondata.spark.rdd.CarbonScanRDD
-import org.apache.carbondata.spark.util.CarbonSparkUtil
case class CarbonDatasourceHadoopRelation(
sparkSession: SparkSession,
@@ -46,15 +46,22 @@ case class CarbonDatasourceHadoopRelation(
isSubquery: ArrayBuffer[Boolean] = new ArrayBuffer[Boolean]())
extends BaseRelation with InsertableRelation {
- lazy val absIdentifier = AbsoluteTableIdentifier.fromTablePath(paths.head)
- lazy val carbonTable = carbonRelation.tableMeta.carbonTable
- lazy val carbonRelation: CarbonRelation = CarbonEnv.getInstance(sparkSession).carbonMetastore
- .lookupRelation(Some(absIdentifier.getCarbonTableIdentifier.getDatabaseName),
- absIdentifier.getCarbonTableIdentifier.getTableName)(sparkSession)
+ lazy val identifier: AbsoluteTableIdentifier = AbsoluteTableIdentifier.fromTablePath(paths.head)
+ lazy val databaseName: String = carbonTable.getDatabaseName
+ lazy val tableName: String = carbonTable.getFactTableName
+ lazy val carbonSessionInfo : CarbonSessionInfo =
+ CarbonEnv.getInstance(sparkSession).carbonSessionInfo
+ ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo)
+
+ @transient lazy val carbonRelation: CarbonRelation =
+ CarbonEnv.getInstance(sparkSession).carbonMetastore
+ .lookupRelation(
+ Some(identifier.getCarbonTableIdentifier.getDatabaseName),
+ identifier.getCarbonTableIdentifier.getTableName)(sparkSession)
.asInstanceOf[CarbonRelation]
- val carbonSessionInfo : CarbonSessionInfo = CarbonEnv.getInstance(sparkSession).carbonSessionInfo
- ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo)
+ @transient lazy val carbonTable: CarbonTable = carbonRelation.tableMeta.carbonTable
+
override def sqlContext: SQLContext = sparkSession.sqlContext
override def schema: StructType = tableSchema.getOrElse(carbonRelation.schema)
@@ -67,15 +74,20 @@ case class CarbonDatasourceHadoopRelation(
val projection = new CarbonProjection
requiredColumns.foreach(projection.addColumn)
- new CarbonScanRDD(sqlContext.sparkContext, projection, filterExpression.orNull,
- absIdentifier, carbonTable)
+ new CarbonScanRDD(
+ sqlContext.sparkContext,
+ projection,
+ filterExpression.orNull,
+ identifier,
+ carbonTable.getTableInfo.serialize(),
+ carbonTable.getTableInfo)
}
override def unhandledFilters(filters: Array[Filter]): Array[Filter] = new Array[Filter](0)
override def toString: String = {
- "CarbonDatasourceHadoopRelation [ " + "Database name :" + carbonTable.getDatabaseName +
- ", " + "Table name :" + carbonTable.getFactTableName + ", Schema :" + tableSchema + " ]"
+ "CarbonDatasourceHadoopRelation [ " + "Database name :" + databaseName +
+ ", " + "Table name :" + tableName + ", Schema :" + tableSchema + " ]"
}
override def sizeInBytes: Long = carbonRelation.sizeInBytes
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac5aee18/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index bd1c8b1..33091aa 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -40,7 +40,7 @@ import org.apache.carbondata.core.metadata.encoder.Encoding
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
import org.apache.carbondata.core.util.DataTypeUtil
import org.apache.carbondata.spark.CarbonAliasDecoderRelation
-import org.apache.carbondata.spark.rdd.CarbonRDD
+import org.apache.carbondata.spark.rdd.{CarbonRDD, CarbonRDDWithTableInfo}
/**
* It decodes the data.
@@ -444,10 +444,9 @@ class CarbonDecoderRDD(
aliasMap: CarbonAliasDecoderRelation,
prev: RDD[InternalRow],
output: Seq[Attribute],
- sparkSession: SparkSession)
- extends CarbonRDD[InternalRow](prev) {
-
- private val storepath = CarbonEnv.getInstance(sparkSession).carbonMetastore.storePath
+ storePath: String,
+ serializedTableInfo: Array[Byte])
+ extends CarbonRDDWithTableInfo[InternalRow](prev, serializedTableInfo) {
def canBeDecoded(attr: Attribute): Boolean = {
profile match {
@@ -516,13 +515,13 @@ class CarbonDecoderRDD(
override def internalCompute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
val absoluteTableIdentifiers = relations.map { relation =>
- val carbonTable = relation.carbonRelation.carbonRelation.metaData.carbonTable
- (carbonTable.getFactTableName, carbonTable.getAbsoluteTableIdentifier)
+ val tableInfo = getTableInfo
+ (tableInfo.getFactTable.getTableName, tableInfo.getOrCreateAbsoluteTableIdentifier)
}.toMap
val cacheProvider: CacheProvider = CacheProvider.getInstance
val forwardDictionaryCache: Cache[DictionaryColumnUniqueIdentifier, Dictionary] =
- cacheProvider.createCache(CacheType.FORWARD_DICTIONARY, storepath)
+ cacheProvider.createCache(CacheType.FORWARD_DICTIONARY, storePath)
val dicts: Seq[Dictionary] = getDictionary(absoluteTableIdentifiers,
forwardDictionaryCache)
val dictIndex = dicts.zipWithIndex.filter(x => x._1 != null).map(x => x._2)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac5aee18/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
index a206bef..1cc6668 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
@@ -17,9 +17,6 @@
package org.apache.spark.sql.execution
-import java.text.SimpleDateFormat
-import java.util.Date
-
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
@@ -34,14 +31,13 @@ import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partition
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.optimizer.CarbonDecoderRelation
import org.apache.spark.sql.sources.{BaseRelation, Filter}
-import org.apache.spark.sql.types.{AtomicType, DoubleType, IntegerType, StringType, TimestampType}
+import org.apache.spark.sql.types.{AtomicType, IntegerType, StringType}
import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.keygenerator.directdictionary.timestamp.TimeStampDirectDictionaryGenerator
import org.apache.carbondata.core.metadata.schema.BucketingInfo
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.spark.{CarbonAliasDecoderRelation}
+import org.apache.carbondata.spark.CarbonAliasDecoderRelation
import org.apache.carbondata.spark.rdd.CarbonScanRDD
import org.apache.carbondata.spark.util.CarbonScalaUtil
@@ -99,8 +95,15 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
relation.addAttribute(newAttr)
newAttr
}
- new CarbonDecoderRDD(Seq(relation), IncludeProfile(attrs),
- CarbonAliasDecoderRelation(), rdd, output, SparkSession.getActiveSession.get)
+
+ new CarbonDecoderRDD(
+ Seq(relation),
+ IncludeProfile(attrs),
+ CarbonAliasDecoderRelation(),
+ rdd,
+ output,
+ CarbonEnv.getInstance(SparkSession.getActiveSession.get).carbonMetastore.storePath,
+ table.carbonTable.getTableInfo.serialize())
}
private[this] def toCatalystRDD(
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac5aee18/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
index 0894f23..2fccd0c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
@@ -795,8 +795,8 @@ object UpdateExecution {
def isDestinationRelation(relation: CarbonDatasourceHadoopRelation): Boolean = {
- val tableName = relation.absIdentifier.getCarbonTableIdentifier.getTableName
- val dbName = relation.absIdentifier.getCarbonTableIdentifier.getDatabaseName
+ val tableName = relation.identifier.getCarbonTableIdentifier.getTableName
+ val dbName = relation.identifier.getCarbonTableIdentifier.getDatabaseName
(tableIdentifier.size > 1 &&
tableIdentifier(0) == dbName &&
tableIdentifier(1) == tableName) ||
@@ -841,8 +841,8 @@ object UpdateExecution {
val header = getHeader(carbonRelation, plan)
LoadTable(
- Some(carbonRelation.absIdentifier.getCarbonTableIdentifier.getDatabaseName),
- carbonRelation.absIdentifier.getCarbonTableIdentifier.getTableName,
+ Some(carbonRelation.identifier.getCarbonTableIdentifier.getDatabaseName),
+ carbonRelation.identifier.getCarbonTableIdentifier.getTableName,
null,
Seq(),
Map(("fileheader" -> header)),
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac5aee18/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 0391cb3..136078c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -126,7 +126,6 @@ case class AlterTableCompaction(alterTableModel: AlterTableModel) extends Runnab
val carbonLoadModel = new CarbonLoadModel()
val table = relation.tableMeta.carbonTable
- carbonLoadModel.setAggTables(table.getAggregateTablesName.asScala.toArray)
carbonLoadModel.setTableName(table.getFactTableName)
val dataLoadSchema = new CarbonDataLoadSchema(table)
// Need to fill dimension relation
@@ -495,7 +494,6 @@ case class LoadTable(
carbonLoadModel.setStorePath(relation.tableMeta.storePath)
val table = relation.tableMeta.carbonTable
- carbonLoadModel.setAggTables(table.getAggregateTablesName.asScala.toArray)
carbonLoadModel.setTableName(table.getFactTableName)
val dataLoadSchema = new CarbonDataLoadSchema(table)
// Need to fill dimension relation
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac5aee18/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
index 21ef6c4..ca9bb47 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
@@ -488,13 +488,6 @@ public class CarbonLoadModel implements Serializable {
}
/**
- * @param aggTables the aggTables to set
- */
- public void setAggTables(String[] aggTables) {
- this.aggTables = aggTables;
- }
-
- /**
* @param storePath The storePath to set.
*/
public void setStorePath(String storePath) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac5aee18/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
----------------------------------------------------------------------
diff --git a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
index 91cc195..a7c2057 100644
--- a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
+++ b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
@@ -261,7 +261,6 @@ public class StoreCreator {
+ absoluteTableIdentifier.getCarbonTableIdentifier().getTableName());
tableInfo.setLastUpdatedTime(System.currentTimeMillis());
tableInfo.setFactTable(tableSchema);
- tableInfo.setAggregateTableList(new ArrayList<TableSchema>());
CarbonTablePath carbonTablePath = CarbonStorePath
.getCarbonTablePath(absoluteTableIdentifier.getStorePath(),