You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/07/27 12:42:07 UTC
[7/7] carbondata git commit: Rebase datamap branch onto master
Rebase datamap branch onto master
This closes #1196
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/79feac96
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/79feac96
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/79feac96
Branch: refs/heads/master
Commit: 79feac96ae789851c5ad7306a7acaaba25d8e6c9
Parents: b681244
Author: Raghunandan S <ra...@gmail.com>
Authored: Thu Jul 27 20:38:48 2017 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Thu Jul 27 20:41:22 2017 +0800
----------------------------------------------------------------------
.../core/indexstore/UnsafeMemoryDMStore.java | 31 +-
.../blockletindex/BlockletDataMap.java | 11 +-
.../core/indexstore/row/DataMapRow.java | 4 +-
.../core/indexstore/row/DataMapRowImpl.java | 4 +
.../core/indexstore/row/UnsafeDataMapRow.java | 40 +-
.../core/memory/UnsafeMemoryManager.java | 19 +-
.../datatype/DecimalConverterFactory.java | 2 +-
.../hadoop/api/CarbonTableInputFormat.java | 83 ++--
.../carbondata/spark/rdd/CarbonMergerRDD.scala | 2 +-
.../execution/command/carbonTableSchema.scala | 6 +-
.../spark/sql/hive/CarbonFileMetastore.scala | 392 +++++++++----------
11 files changed, 301 insertions(+), 293 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/79feac96/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
index 8246f99..13951dc 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
@@ -19,9 +19,10 @@ package org.apache.carbondata.core.indexstore;
import org.apache.carbondata.core.indexstore.row.DataMapRow;
import org.apache.carbondata.core.indexstore.row.UnsafeDataMapRow;
import org.apache.carbondata.core.indexstore.schema.DataMapSchema;
-import org.apache.carbondata.core.memory.MemoryAllocator;
-import org.apache.carbondata.core.memory.MemoryAllocatorFactory;
import org.apache.carbondata.core.memory.MemoryBlock;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.memory.UnsafeMemoryManager;
+import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
import static org.apache.carbondata.core.memory.CarbonUnsafe.BYTE_ARRAY_OFFSET;
import static org.apache.carbondata.core.memory.CarbonUnsafe.unsafe;
@@ -39,8 +40,6 @@ public class UnsafeMemoryDMStore {
private int runningLength;
- private MemoryAllocator memoryAllocator;
-
private boolean isMemoryFreed;
private DataMapSchema[] schema;
@@ -49,11 +48,13 @@ public class UnsafeMemoryDMStore {
private int rowCount;
- public UnsafeMemoryDMStore(DataMapSchema[] schema) {
+ private final long taskId = null != ThreadLocalTaskInfo.getCarbonTaskInfo() ?
+ ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId() : System.nanoTime();
+
+ public UnsafeMemoryDMStore(DataMapSchema[] schema) throws MemoryException {
this.schema = schema;
- this.memoryAllocator = MemoryAllocatorFactory.INSATANCE.getMemoryAllocator();
this.allocatedSize = capacity;
- this.memoryBlock = memoryAllocator.allocate(allocatedSize);
+ this.memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, allocatedSize);
this.pointers = new int[1000];
}
@@ -63,13 +64,13 @@ public class UnsafeMemoryDMStore {
*
* @param rowSize
*/
- private void ensureSize(int rowSize) {
+ private void ensureSize(int rowSize) throws MemoryException {
if (runningLength + rowSize >= allocatedSize) {
MemoryBlock allocate =
- MemoryAllocatorFactory.INSATANCE.getMemoryAllocator().allocate(allocatedSize + capacity);
+ UnsafeMemoryManager.allocateMemoryWithRetry(taskId, allocatedSize + capacity);
unsafe.copyMemory(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset(),
allocate.getBaseObject(), allocate.getBaseOffset(), runningLength);
- memoryAllocator.free(memoryBlock);
+ UnsafeMemoryManager.INSTANCE.freeMemory(taskId, memoryBlock);
allocatedSize = allocatedSize + capacity;
memoryBlock = allocate;
}
@@ -86,7 +87,7 @@ public class UnsafeMemoryDMStore {
* @param indexRow
* @return
*/
- public void addIndexRowToUnsafe(DataMapRow indexRow) {
+ public void addIndexRowToUnsafe(DataMapRow indexRow) throws MemoryException {
// First calculate the required memory to keep the row in unsafe
int rowSize = indexRow.getTotalSizeInBytes();
// Check whether allocated memory is sufficient or not.
@@ -168,13 +169,13 @@ public class UnsafeMemoryDMStore {
return new UnsafeDataMapRow(schema, memoryBlock, pointers[index]);
}
- public void finishWriting() {
+ public void finishWriting() throws MemoryException {
if (runningLength < allocatedSize) {
MemoryBlock allocate =
- MemoryAllocatorFactory.INSATANCE.getMemoryAllocator().allocate(runningLength);
+ UnsafeMemoryManager.allocateMemoryWithRetry(taskId, runningLength);
unsafe.copyMemory(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset(),
allocate.getBaseObject(), allocate.getBaseOffset(), runningLength);
- memoryAllocator.free(memoryBlock);
+ UnsafeMemoryManager.INSTANCE.freeMemory(taskId, memoryBlock);
memoryBlock = allocate;
}
// Compact pointers.
@@ -187,7 +188,7 @@ public class UnsafeMemoryDMStore {
public void freeMemory() {
if (!isMemoryFreed) {
- memoryAllocator.free(memoryBlock);
+ UnsafeMemoryManager.INSTANCE.freeMemory(taskId, memoryBlock);
isMemoryFreed = true;
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/79feac96/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
index 79aa091..680852d 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
@@ -44,6 +44,7 @@ import org.apache.carbondata.core.indexstore.row.DataMapRow;
import org.apache.carbondata.core.indexstore.row.DataMapRowImpl;
import org.apache.carbondata.core.indexstore.schema.DataMapSchema;
import org.apache.carbondata.core.keygenerator.KeyGenException;
+import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex;
@@ -110,7 +111,7 @@ public class BlockletDataMap implements DataMap, Cacheable {
if (unsafeMemoryDMStore != null) {
unsafeMemoryDMStore.finishWriting();
}
- } catch (IOException e) {
+ } catch (Exception e) {
throw new RuntimeException(e);
}
}
@@ -156,11 +157,11 @@ public class BlockletDataMap implements DataMap, Cacheable {
DataOutput dataOutput = new DataOutputStream(stream);
blockletInfo.write(dataOutput);
serializedData = stream.toByteArray();
- } catch (IOException e) {
+ row.setByteArray(serializedData, ordinal);
+ unsafeMemoryDMStore.addIndexRowToUnsafe(row);
+ } catch (Exception e) {
throw new RuntimeException(e);
}
- row.setByteArray(serializedData, ordinal);
- unsafeMemoryDMStore.addIndexRowToUnsafe(row);
}
}
@@ -176,7 +177,7 @@ public class BlockletDataMap implements DataMap, Cacheable {
return minRow;
}
- private void createSchema(SegmentProperties segmentProperties) {
+ private void createSchema(SegmentProperties segmentProperties) throws MemoryException {
List<DataMapSchema> indexSchemas = new ArrayList<>();
// Index key
http://git-wip-us.apache.org/repos/asf/carbondata/blob/79feac96/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java
index defe766..631e0ad 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java
@@ -62,6 +62,8 @@ public abstract class DataMapRow {
public abstract double getDouble(int ordinal);
+ public abstract int getLengthInBytes(int ordinal);
+
public int getTotalSizeInBytes() {
int len = 0;
for (int i = 0; i < schemas.length; i++) {
@@ -75,7 +77,7 @@ public abstract class DataMapRow {
case FIXED:
return schemas[ordinal].getLength();
case VARIABLE:
- return getByteArray(ordinal).length + 2;
+ return getLengthInBytes(ordinal) + 2;
case STRUCT:
return getRow(ordinal).getTotalSizeInBytes();
default:
http://git-wip-us.apache.org/repos/asf/carbondata/blob/79feac96/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java
index adec346..32d15d3 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java
@@ -35,6 +35,10 @@ public class DataMapRowImpl extends DataMapRow {
return (byte[]) data[ordinal];
}
+ @Override public int getLengthInBytes(int ordinal) {
+ return ((byte[]) data[ordinal]).length;
+ }
+
@Override public DataMapRow getRow(int ordinal) {
return (DataMapRow) data[ordinal];
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/79feac96/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java b/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java
index ef78514..c398115 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java
@@ -55,6 +55,31 @@ public class UnsafeDataMapRow extends DataMapRow {
return data;
}
+ @Override public int getLengthInBytes(int ordinal) {
+ int length;
+ int position = getPosition(ordinal);
+ switch (schemas[ordinal].getSchemaType()) {
+ case VARIABLE:
+ length = unsafe.getShort(block.getBaseObject(), block.getBaseOffset() + pointer + position);
+ break;
+ default:
+ length = schemas[ordinal].getLength();
+ }
+ return length;
+ }
+
+ private int getLengthInBytes(int ordinal, int position) {
+ int length;
+ switch (schemas[ordinal].getSchemaType()) {
+ case VARIABLE:
+ length = unsafe.getShort(block.getBaseObject(), block.getBaseOffset() + pointer + position);
+ break;
+ default:
+ length = schemas[ordinal].getLength();
+ }
+ return length;
+ }
+
@Override public DataMapRow getRow(int ordinal) {
DataMapSchema[] childSchemas =
((DataMapSchema.StructDataMapSchema) schemas[ordinal]).getChildSchemas();
@@ -123,10 +148,23 @@ public class UnsafeDataMapRow extends DataMapRow {
throw new UnsupportedOperationException("Not supported to set on unsafe row");
}
+ private int getSizeInBytes(int ordinal, int position) {
+ switch (schemas[ordinal].getSchemaType()) {
+ case FIXED:
+ return schemas[ordinal].getLength();
+ case VARIABLE:
+ return getLengthInBytes(ordinal, position) + 2;
+ case STRUCT:
+ return getRow(ordinal).getTotalSizeInBytes();
+ default:
+ throw new UnsupportedOperationException("wrong type");
+ }
+ }
+
private int getPosition(int ordinal) {
int position = 0;
for (int i = 0; i < ordinal; i++) {
- position += getSizeInBytes(i);
+ position += getSizeInBytes(i, position);
}
return position;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/79feac96/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
index 991bc90..d433b5e 100644
--- a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
@@ -90,33 +90,28 @@ public class UnsafeMemoryManager {
if (memoryUsed + memoryRequested <= totalMemory) {
MemoryBlock allocate = allocator.allocate(memoryRequested);
memoryUsed += allocate.size();
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug(
- "Working Memory block (" + allocate + ") is created with size " + allocate.size()
- + ". Total memory used " + memoryUsed + "Bytes, left " + (totalMemory - memoryUsed)
- + "Bytes");
- }
Set<MemoryBlock> listOfMemoryBlock = taskIdToMemoryBlockMap.get(taskId);
if (null == listOfMemoryBlock) {
listOfMemoryBlock = new HashSet<>();
taskIdToMemoryBlockMap.put(taskId, listOfMemoryBlock);
}
listOfMemoryBlock.add(allocate);
+ LOGGER.info("Memory block (" + allocate + ") is created with size " + allocate.size()
+ + ". Total memory used " + memoryUsed + "Bytes, left " + (totalMemory - memoryUsed)
+ + "Bytes");
return allocate;
}
return null;
}
- public synchronized void freeMemory(long taskId,MemoryBlock memoryBlock) {
+ public synchronized void freeMemory(long taskId, MemoryBlock memoryBlock) {
taskIdToMemoryBlockMap.get(taskId).remove(memoryBlock);
allocator.free(memoryBlock);
memoryUsed -= memoryBlock.size();
memoryUsed = memoryUsed < 0 ? 0 : memoryUsed;
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug(
- "Freeing memory of size: " + memoryBlock.size() + ": Current available memory is: " + (
- totalMemory - memoryUsed));
- }
+ LOGGER.info(
+ "Freeing memory of size: " + memoryBlock.size() + "available memory: " + (totalMemory
+ - memoryUsed));
}
public void freeMemoryAll(long taskId) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/79feac96/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java
index 555df1c..459eb24 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java
@@ -29,7 +29,7 @@ import org.apache.carbondata.core.util.DataTypeUtil;
*/
public final class DecimalConverterFactory {
- public static DecimalConverterFactory INSTANCE = new DecimalConverterFactory();
+ public static final DecimalConverterFactory INSTANCE = new DecimalConverterFactory();
private int[] minBytesForPrecision = minBytesForPrecision();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/79feac96/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index e73c04a..9e6e284 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -38,6 +38,7 @@ import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
import org.apache.carbondata.core.metadata.schema.PartitionInfo;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
import org.apache.carbondata.core.mutate.SegmentUpdateDetails;
import org.apache.carbondata.core.mutate.UpdateVO;
@@ -99,59 +100,56 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
private static final String FILTER_PREDICATE =
"mapreduce.input.carboninputformat.filter.predicate";
private static final String COLUMN_PROJECTION = "mapreduce.input.carboninputformat.projection";
- private static final String CARBON_TABLE = "mapreduce.input.carboninputformat.table";
+ private static final String TABLE_INFO = "mapreduce.input.carboninputformat.tableinfo";
private static final String CARBON_READ_SUPPORT = "mapreduce.input.carboninputformat.readsupport";
+ // a cache for carbon table, it will be used in task side
+ private CarbonTable carbonTable;
+
/**
- * It is optional, if user does not set then it reads from store
- *
- * @param configuration
- * @param carbonTable
- * @throws IOException
+ * Set the `tableInfo` in `configuration`
*/
- public static void setCarbonTable(Configuration configuration, CarbonTable carbonTable)
+ public static void setTableInfo(Configuration configuration, TableInfo tableInfo)
throws IOException {
- if (null != carbonTable) {
- configuration.set(CARBON_TABLE, ObjectSerializationUtil.convertObjectToString(carbonTable));
+ if (null != tableInfo) {
+ configuration.set(TABLE_INFO, ObjectSerializationUtil.convertObjectToString(tableInfo));
}
}
- public static CarbonTable getCarbonTable(Configuration configuration) throws IOException {
- String carbonTableStr = configuration.get(CARBON_TABLE);
- if (carbonTableStr == null) {
- populateCarbonTable(configuration);
- // read it from schema file in the store
- carbonTableStr = configuration.get(CARBON_TABLE);
- return (CarbonTable) ObjectSerializationUtil.convertStringToObject(carbonTableStr);
- }
- return (CarbonTable) ObjectSerializationUtil.convertStringToObject(carbonTableStr);
+ /**
+ * Get TableInfo object from `configuration`
+ */
+ private TableInfo getTableInfo(Configuration configuration) throws IOException {
+ String tableInfoStr = configuration.get(TABLE_INFO);
+ return (TableInfo) ObjectSerializationUtil.convertStringToObject(tableInfoStr);
}
/**
- * this method will read the schema from the physical file and populate into CARBON_TABLE
- *
- * @param configuration
- * @throws IOException
+ * Get the cached CarbonTable or create it by TableInfo in `configuration`
*/
- private static void populateCarbonTable(Configuration configuration) throws IOException {
- String dirs = configuration.get(INPUT_DIR, "");
- String[] inputPaths = StringUtils.split(dirs);
- if (inputPaths.length == 0) {
- throw new InvalidPathException("No input paths specified in job");
+ private CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IOException {
+ if (carbonTable == null) {
+ // carbon table should be created either from deserialized table info (schema saved in
+ // hive metastore) or by reading schema in HDFS (schema saved in HDFS)
+ TableInfo tableInfo = getTableInfo(configuration);
+ CarbonTable carbonTable;
+ if (tableInfo != null) {
+ carbonTable = CarbonTable.buildFromTableInfo(tableInfo);
+ } else {
+ carbonTable = SchemaReader.readCarbonTableFromStore(
+ getAbsoluteTableIdentifier(configuration));
+ }
+ this.carbonTable = carbonTable;
+ return carbonTable;
+ } else {
+ return this.carbonTable;
}
- AbsoluteTableIdentifier absoluteTableIdentifier =
- AbsoluteTableIdentifier.fromTablePath(inputPaths[0]);
- // read the schema file to get the absoluteTableIdentifier having the correct table id
- // persisted in the schema
- CarbonTable carbonTable = SchemaReader.readCarbonTableFromStore(absoluteTableIdentifier);
- setCarbonTable(configuration, carbonTable);
}
public static void setTablePath(Configuration configuration, String tablePath)
throws IOException {
configuration.set(FileInputFormat.INPUT_DIR, tablePath);
}
-
/**
* It sets unresolved filter expression.
*
@@ -213,9 +211,14 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
configuration.set(INPUT_FILES, CarbonUtil.getSegmentString(validFiles));
}
- private static AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configuration)
+ private AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configuration)
throws IOException {
- return getCarbonTable(configuration).getAbsoluteTableIdentifier();
+ String dirs = configuration.get(INPUT_DIR, "");
+ String[] inputPaths = StringUtils.split(dirs);
+ if (inputPaths.length == 0) {
+ throw new InvalidPathException("No input paths specified in job");
+ }
+ return AbsoluteTableIdentifier.fromTablePath(inputPaths[0]);
}
/**
@@ -262,7 +265,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
// process and resolve the expression
Expression filter = getFilterPredicates(job.getConfiguration());
- CarbonTable carbonTable = getCarbonTable(job.getConfiguration());
+ CarbonTable carbonTable = getOrCreateCarbonTable(job.getConfiguration());
// this will be null in case of corrupt schema file.
if (null == carbonTable) {
throw new IOException("Missing/Corrupt schema file for table.");
@@ -277,7 +280,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
if (null != partitionInfo) {
Partitioner partitioner = PartitionUtil.getPartitioner(partitionInfo);
matchedPartitions = new FilterExpressionProcessor()
- .getFilteredPartitions(filter, partitionInfo, partitioner);
+ .getFilteredPartitions(filter, partitionInfo);
if (matchedPartitions.cardinality() == 0) {
// no partition is required
return new ArrayList<InputSplit>();
@@ -320,7 +323,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
Boolean isIUDTable = false;
AbsoluteTableIdentifier absoluteTableIdentifier =
- getCarbonTable(job.getConfiguration()).getAbsoluteTableIdentifier();
+ getOrCreateCarbonTable(job.getConfiguration()).getAbsoluteTableIdentifier();
SegmentUpdateStatusManager updateStatusManager =
new SegmentUpdateStatusManager(absoluteTableIdentifier);
@@ -432,7 +435,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
public QueryModel getQueryModel(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
throws IOException {
Configuration configuration = taskAttemptContext.getConfiguration();
- CarbonTable carbonTable = getCarbonTable(configuration);
+ CarbonTable carbonTable = getOrCreateCarbonTable(configuration);
// getting the table absoluteTableIdentifier from the carbonTable
// to avoid unnecessary deserialization
AbsoluteTableIdentifier identifier = carbonTable.getAbsoluteTableIdentifier();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/79feac96/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index 1a8183c..add0578 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -265,7 +265,7 @@ class CarbonMergerRDD[K, V](
val jobConf: JobConf = new JobConf(new Configuration)
val job: Job = new Job(jobConf)
val format = CarbonInputFormatUtil.createCarbonInputFormat(absoluteTableIdentifier, job)
- CarbonInputFormat.setTableInfo(job.getConfiguration,
+ CarbonTableInputFormat.setTableInfo(job.getConfiguration,
carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getTableInfo)
var updateDetails: UpdateVO = null
// initialise query_id for job
http://git-wip-us.apache.org/repos/asf/carbondata/blob/79feac96/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index f3baf58..d34b91d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -268,7 +268,7 @@ case class DeleteLoadsById(
override def processData(sparkSession: SparkSession): Seq[Row] = {
Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore.
- lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation].
+ lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation].
tableMeta.carbonTable
CarbonStore.deleteLoadById(
loadids,
@@ -293,7 +293,7 @@ case class DeleteLoadsByLoadDate(
override def processData(sparkSession: SparkSession): Seq[Row] = {
Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore.
- lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation].
+ lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation].
tableMeta.carbonTable
CarbonStore.deleteLoadByDate(
loadDate,
@@ -847,7 +847,7 @@ case class ShowLoads(
override def processData(sparkSession: SparkSession): Seq[Row] = {
Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore.
- lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation].
+ lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation].
tableMeta.carbonTable
CarbonStore.showSegments(
getDB.getDatabaseName(databaseNameOp, sparkSession),
http://git-wip-us.apache.org/repos/asf/carbondata/blob/79feac96/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index 549841b..c9eaf6d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -22,22 +22,23 @@ import java.util.concurrent.atomic.AtomicLong
import scala.collection.mutable.ArrayBuffer
-import org.apache.spark.sql.{RuntimeConfig, SparkSession}
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, RuntimeConfig, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.datastore.impl.FileFactory.FileType
import org.apache.carbondata.core.fileoperations.FileWriteOperation
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier}
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
+import org.apache.carbondata.core.metadata.schema
+import org.apache.carbondata.core.metadata.schema.table
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.stats.{QueryStatistic, QueryStatisticsConstants}
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, CarbonUtil}
+import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
import org.apache.carbondata.core.writer.ThriftWriter
import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
@@ -62,7 +63,7 @@ case class DictionaryMap(dictionaryMap: Map[String, Boolean]) {
}
}
-class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends CarbonMetaStore {
+class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore {
@transient
val LOGGER = LogServiceFactory.getLogService("org.apache.spark.sql.CarbonMetastoreCatalog")
@@ -77,7 +78,7 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
System.nanoTime() + ""
}
- lazy val metadata = loadMetadata(storePath, nextQueryId)
+ val metadata = MetaData(new ArrayBuffer[TableMeta]())
/**
@@ -90,9 +91,22 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
override def createCarbonRelation(parameters: Map[String, String],
absIdentifier: AbsoluteTableIdentifier,
sparkSession: SparkSession): CarbonRelation = {
- lookupRelation(TableIdentifier(absIdentifier.getCarbonTableIdentifier.getTableName,
- Some(absIdentifier.getCarbonTableIdentifier.getDatabaseName)))(sparkSession)
- .asInstanceOf[CarbonRelation]
+ val database = absIdentifier.getCarbonTableIdentifier.getDatabaseName
+ val tableName = absIdentifier.getCarbonTableIdentifier.getTableName
+ val tables = getTableFromMetadataCache(database, tableName)
+ tables match {
+ case Some(t) =>
+ CarbonRelation(database, tableName,
+ CarbonSparkUtil.createSparkMeta(t.carbonTable), t)
+ case None =>
+ readCarbonSchema(absIdentifier) match {
+ case Some(meta) =>
+ CarbonRelation(database, tableName,
+ CarbonSparkUtil.createSparkMeta(meta.carbonTable), meta)
+ case None =>
+ throw new NoSuchTableException(database, tableName)
+ }
+ }
}
def lookupRelation(dbName: Option[String], tableName: String)
@@ -100,20 +114,21 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
lookupRelation(TableIdentifier(tableName, dbName))(sparkSession)
}
- def lookupRelation(tableIdentifier: TableIdentifier)
+ override def lookupRelation(tableIdentifier: TableIdentifier)
(sparkSession: SparkSession): LogicalPlan = {
- checkSchemasModifiedTimeAndReloadTables()
val database = tableIdentifier.database.getOrElse(
- sparkSession.catalog.currentDatabase
- )
- val tables = getTableFromMetadata(database, tableIdentifier.table, true)
- tables match {
- case Some(t) =>
- CarbonRelation(database, tableIdentifier.table,
- CarbonSparkUtil.createSparkMeta(tables.head.carbonTable), tables.head)
- case None =>
- throw new NoSuchTableException(database, tableIdentifier.table)
+ sparkSession.catalog.currentDatabase)
+ val relation = sparkSession.sessionState.catalog.lookupRelation(tableIdentifier) match {
+ case SubqueryAlias(_,
+ LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _),
+ _) =>
+ carbonDatasourceHadoopRelation.carbonRelation
+ case LogicalRelation(
+ carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _) =>
+ carbonDatasourceHadoopRelation.carbonRelation
+ case _ => throw new NoSuchTableException(database, tableIdentifier.table)
}
+ relation
}
/**
@@ -123,11 +138,10 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
* @param tableName
* @return
*/
- def getTableFromMetadata(database: String,
- tableName: String, readStore: Boolean = false): Option[TableMeta] = {
+ def getTableFromMetadataCache(database: String, tableName: String): Option[TableMeta] = {
metadata.tablesMeta
.find(c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(database) &&
- c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableName))
+ c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableName))
}
def tableExists(
@@ -136,99 +150,48 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
tableExists(TableIdentifier(table, databaseOp))(sparkSession)
}
- def tableExists(tableIdentifier: TableIdentifier)(sparkSession: SparkSession): Boolean = {
- checkSchemasModifiedTimeAndReloadTables()
- val database = tableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase)
- val tables = metadata.tablesMeta.filter(
- c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(database) &&
- c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableIdentifier.table))
- tables.nonEmpty
- }
-
- def loadMetadata(metadataPath: String, queryId: String): MetaData = {
- val recorder = CarbonTimeStatisticsFactory.createDriverRecorder()
- val statistic = new QueryStatistic()
- // creating zookeeper instance once.
- // if zookeeper is configured as carbon lock type.
- val zookeeperurl = conf.get(CarbonCommonConstants.ZOOKEEPER_URL, null)
- if (null != zookeeperurl) {
- CarbonProperties.getInstance
- .addProperty(CarbonCommonConstants.ZOOKEEPER_URL, zookeeperurl)
- }
- if (metadataPath == null) {
- return null
- }
- // if no locktype is configured and store type is HDFS set HDFS lock as default
- if (null == CarbonProperties.getInstance
- .getProperty(CarbonCommonConstants.LOCK_TYPE) &&
- FileType.HDFS == FileFactory.getFileType(metadataPath)) {
- CarbonProperties.getInstance
- .addProperty(CarbonCommonConstants.LOCK_TYPE,
- CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS
- )
- LOGGER.info("Default lock type HDFSLOCK is configured")
+ override def tableExists(tableIdentifier: TableIdentifier)
+ (sparkSession: SparkSession): Boolean = {
+ try {
+ lookupRelation(tableIdentifier)(sparkSession)
+ } catch {
+ case e: Exception =>
+ return false
}
- val fileType = FileFactory.getFileType(metadataPath)
- val metaDataBuffer = new ArrayBuffer[TableMeta]
- fillMetaData(metadataPath, fileType, metaDataBuffer)
- updateSchemasUpdatedTime(readSchemaFileSystemTime("", ""))
- statistic.addStatistics(QueryStatisticsConstants.LOAD_META,
- System.currentTimeMillis())
- recorder.recordStatisticsForDriver(statistic, queryId)
- MetaData(metaDataBuffer)
+ true
}
- private def fillMetaData(basePath: String, fileType: FileType,
- metaDataBuffer: ArrayBuffer[TableMeta]): Unit = {
- val databasePath = basePath // + "/schemas"
- try {
- if (FileFactory.isFileExist(databasePath, fileType)) {
- val file = FileFactory.getCarbonFile(databasePath, fileType)
- val databaseFolders = file.listFiles()
-
- databaseFolders.foreach(databaseFolder => {
- if (databaseFolder.isDirectory) {
- val dbName = databaseFolder.getName
- val tableFolders = databaseFolder.listFiles()
-
- tableFolders.foreach(tableFolder => {
- if (tableFolder.isDirectory) {
- val carbonTableIdentifier = new CarbonTableIdentifier(databaseFolder.getName,
- tableFolder.getName, UUID.randomUUID().toString)
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(basePath,
- carbonTableIdentifier)
- val tableMetadataFile = carbonTablePath.getSchemaFilePath
-
- if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
- val tableName = tableFolder.getName
- val tableUniqueName = databaseFolder.getName + "_" + tableFolder.getName
- val tableInfo: TableInfo = CarbonUtil.readSchemaFile(tableMetadataFile)
- val schemaConverter = new ThriftWrapperSchemaConverterImpl
- val wrapperTableInfo = schemaConverter
- .fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, basePath)
- val schemaFilePath = CarbonStorePath
- .getCarbonTablePath(storePath, carbonTableIdentifier).getSchemaFilePath
- wrapperTableInfo.setStorePath(storePath)
- wrapperTableInfo
- .setMetaDataFilepath(CarbonTablePath.getFolderContainingFile(schemaFilePath))
- CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
- val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
- metaDataBuffer += new TableMeta(carbonTable.getCarbonTableIdentifier, storePath,
- carbonTable)
- }
- }
- })
- }
- })
- } else {
- // Create folders and files.
- FileFactory.mkdirs(databasePath, fileType)
- }
- } catch {
- case s: java.io.FileNotFoundException =>
- s.printStackTrace()
- // Create folders and files.
- FileFactory.mkdirs(databasePath, fileType)
+ private def readCarbonSchema(identifier: AbsoluteTableIdentifier): Option[TableMeta] = {
+ val dbName = identifier.getCarbonTableIdentifier.getDatabaseName
+ val tableName = identifier.getCarbonTableIdentifier.getTableName
+ val storePath = identifier.getStorePath
+ val carbonTableIdentifier = new CarbonTableIdentifier(dbName.toLowerCase(),
+ tableName.toLowerCase(), UUID.randomUUID().toString)
+ val carbonTablePath =
+ CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier)
+ val tableMetadataFile = carbonTablePath.getSchemaFilePath
+ val fileType = FileFactory.getFileType(tableMetadataFile)
+ if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
+ val tableUniqueName = dbName + "_" + tableName
+ val tableInfo: TableInfo = CarbonUtil.readSchemaFile(tableMetadataFile)
+ val schemaConverter = new ThriftWrapperSchemaConverterImpl
+ val wrapperTableInfo = schemaConverter
+ .fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, storePath)
+ val schemaFilePath = CarbonStorePath
+ .getCarbonTablePath(storePath, carbonTableIdentifier).getSchemaFilePath
+ wrapperTableInfo.setStorePath(storePath)
+ wrapperTableInfo
+ .setMetaDataFilepath(CarbonTablePath.getFolderContainingFile(schemaFilePath))
+ CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
+ val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
+ val tableMeta = new TableMeta(carbonTable.getCarbonTableIdentifier,
+ identifier.getStorePath,
+ identifier.getTablePath,
+ carbonTable)
+ metadata.tablesMeta += tableMeta
+ Some(tableMeta)
+ } else {
+ None
}
}
@@ -238,28 +201,36 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
* @param newTableIdentifier
* @param thriftTableInfo
* @param schemaEvolutionEntry
- * @param carbonStorePath
+ * @param tablePath
* @param sparkSession
*/
def updateTableSchema(newTableIdentifier: CarbonTableIdentifier,
oldTableIdentifier: CarbonTableIdentifier,
thriftTableInfo: org.apache.carbondata.format.TableInfo,
schemaEvolutionEntry: SchemaEvolutionEntry,
- carbonStorePath: String)
- (sparkSession: SparkSession): String = {
+ tablePath: String) (sparkSession: SparkSession): String = {
+ val absoluteTableIdentifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
val schemaConverter = new ThriftWrapperSchemaConverterImpl
if (schemaEvolutionEntry != null) {
thriftTableInfo.fact_table.schema_evolution.schema_evolution_history.add(schemaEvolutionEntry)
}
val wrapperTableInfo = schemaConverter
.fromExternalToWrapperTableInfo(thriftTableInfo,
- newTableIdentifier.getDatabaseName,
- newTableIdentifier.getTableName,
- carbonStorePath)
- createSchemaThriftFile(wrapperTableInfo,
+ newTableIdentifier.getDatabaseName,
+ newTableIdentifier.getTableName,
+ absoluteTableIdentifier.getStorePath)
+ val identifier =
+ new CarbonTableIdentifier(newTableIdentifier.getDatabaseName,
+ newTableIdentifier.getTableName,
+ wrapperTableInfo.getFactTable.getTableId)
+ val path = createSchemaThriftFile(wrapperTableInfo,
thriftTableInfo,
- newTableIdentifier.getDatabaseName,
- newTableIdentifier.getTableName)(sparkSession)
+ identifier)
+ addTableCache(wrapperTableInfo,
+ AbsoluteTableIdentifier.from(absoluteTableIdentifier.getStorePath,
+ newTableIdentifier.getDatabaseName,
+ newTableIdentifier.getTableName))
+ path
}
/**
@@ -267,25 +238,27 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
*
* @param carbonTableIdentifier
* @param thriftTableInfo
- * @param carbonStorePath
+ * @param tablePath
* @param sparkSession
*/
def revertTableSchema(carbonTableIdentifier: CarbonTableIdentifier,
thriftTableInfo: org.apache.carbondata.format.TableInfo,
- carbonStorePath: String)
- (sparkSession: SparkSession): String = {
+ tablePath: String)(sparkSession: SparkSession): String = {
+ val tableIdentifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
val schemaConverter = new ThriftWrapperSchemaConverterImpl
val wrapperTableInfo = schemaConverter
.fromExternalToWrapperTableInfo(thriftTableInfo,
carbonTableIdentifier.getDatabaseName,
carbonTableIdentifier.getTableName,
- carbonStorePath)
+ tableIdentifier.getStorePath)
val evolutionEntries = thriftTableInfo.fact_table.schema_evolution.schema_evolution_history
evolutionEntries.remove(evolutionEntries.size() - 1)
- createSchemaThriftFile(wrapperTableInfo,
+ wrapperTableInfo.setStorePath(tableIdentifier.getStorePath)
+ val path = createSchemaThriftFile(wrapperTableInfo,
thriftTableInfo,
- carbonTableIdentifier.getDatabaseName,
- carbonTableIdentifier.getTableName)(sparkSession)
+ tableIdentifier.getCarbonTableIdentifier)
+ addTableCache(wrapperTableInfo, tableIdentifier)
+ path
}
@@ -296,24 +269,38 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
* Load CarbonTable from wrapper tableInfo
*
*/
- def createTableFromThrift(
- tableInfo: org.apache.carbondata.core.metadata.schema.table.TableInfo,
- dbName: String, tableName: String)(sparkSession: SparkSession): (String, String) = {
- if (tableExists(tableName, Some(dbName))(sparkSession)) {
- sys.error(s"Table [$tableName] already exists under Database [$dbName]")
- }
- val schemaEvolutionEntry = new SchemaEvolutionEntry(tableInfo.getLastUpdatedTime)
+ def saveToDisk(tableInfo: schema.table.TableInfo, tablePath: String) {
val schemaConverter = new ThriftWrapperSchemaConverterImpl
+ val dbName = tableInfo.getDatabaseName
+ val tableName = tableInfo.getFactTable.getTableName
val thriftTableInfo = schemaConverter
.fromWrapperToExternalTableInfo(tableInfo, dbName, tableName)
- thriftTableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history
- .add(schemaEvolutionEntry)
- val carbonTablePath = createSchemaThriftFile(tableInfo,
+ val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
+ tableInfo.setStorePath(identifier.getStorePath)
+ createSchemaThriftFile(tableInfo,
thriftTableInfo,
- dbName,
- tableName)(sparkSession)
+ identifier.getCarbonTableIdentifier)
LOGGER.info(s"Table $tableName for Database $dbName created successfully.")
- (carbonTablePath, "")
+ }
+
+ /**
+ * Generates schema string from TableInfo
+ */
+ override def generateTableSchemaString(tableInfo: schema.table.TableInfo,
+ tablePath: String): String = {
+ val tableIdentifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
+ val carbonTablePath = CarbonStorePath.getCarbonTablePath(tableIdentifier)
+ val schemaMetadataPath =
+ CarbonTablePath.getFolderContainingFile(carbonTablePath.getSchemaFilePath)
+ tableInfo.setMetaDataFilepath(schemaMetadataPath)
+ tableInfo.setStorePath(tableIdentifier.getStorePath)
+ val schemaEvolutionEntry = new schema.SchemaEvolutionEntry
+ schemaEvolutionEntry.setTimeStamp(tableInfo.getLastUpdatedTime)
+ tableInfo.getFactTable.getSchemaEvalution.getSchemaEvolutionEntryList.add(schemaEvolutionEntry)
+ removeTableFromMetadata(tableInfo.getDatabaseName, tableInfo.getFactTable.getTableName)
+ CarbonMetadata.getInstance().loadTableMetadata(tableInfo)
+ addTableCache(tableInfo, tableIdentifier)
+ CarbonUtil.convertToMultiGsonStrings(tableInfo, " ", "", ",")
}
/**
@@ -321,23 +308,16 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
*
* @param tableInfo
* @param thriftTableInfo
- * @param dbName
- * @param tableName
- * @param sparkSession
* @return
*/
- private def createSchemaThriftFile(
- tableInfo: org.apache.carbondata.core.metadata.schema.table.TableInfo,
- thriftTableInfo: org.apache.carbondata.format.TableInfo,
- dbName: String, tableName: String)
- (sparkSession: SparkSession): String = {
- val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName,
- tableInfo.getFactTable.getTableId)
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier)
+ private def createSchemaThriftFile(tableInfo: schema.table.TableInfo,
+ thriftTableInfo: TableInfo,
+ carbonTableIdentifier: CarbonTableIdentifier): String = {
+ val carbonTablePath = CarbonStorePath.
+ getCarbonTablePath(tableInfo.getStorePath, carbonTableIdentifier)
val schemaFilePath = carbonTablePath.getSchemaFilePath
val schemaMetadataPath = CarbonTablePath.getFolderContainingFile(schemaFilePath)
tableInfo.setMetaDataFilepath(schemaMetadataPath)
- tableInfo.setStorePath(storePath)
val fileType = FileFactory.getFileType(schemaMetadataPath)
if (!FileFactory.isFileExist(schemaMetadataPath, fileType)) {
FileFactory.mkdirs(schemaMetadataPath, fileType)
@@ -346,13 +326,20 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
thriftWriter.open(FileWriteOperation.OVERWRITE)
thriftWriter.write(thriftTableInfo)
thriftWriter.close()
- removeTableFromMetadata(dbName, tableName)
+ updateSchemasUpdatedTime(touchSchemaFileSystemTime(tableInfo.getStorePath))
+ carbonTablePath.getPath
+ }
+
+ protected def addTableCache(tableInfo: table.TableInfo,
+ absoluteTableIdentifier: AbsoluteTableIdentifier) = {
+ val identifier = absoluteTableIdentifier.getCarbonTableIdentifier
+ CarbonMetadata.getInstance.removeTable(tableInfo.getTableUniqueName)
+ removeTableFromMetadata(identifier.getDatabaseName, identifier.getTableName)
CarbonMetadata.getInstance().loadTableMetadata(tableInfo)
- val tableMeta = new TableMeta(carbonTableIdentifier, storePath,
- CarbonMetadata.getInstance().getCarbonTable(dbName + '_' + tableName))
+ val tableMeta = new TableMeta(identifier, absoluteTableIdentifier.getStorePath,
+ absoluteTableIdentifier.getTablePath,
+ CarbonMetadata.getInstance().getCarbonTable(identifier.getTableUniqueName))
metadata.tablesMeta += tableMeta
- updateSchemasUpdatedTime(touchSchemaFileSystemTime(dbName, tableName))
- carbonTablePath.getPath
}
/**
@@ -362,13 +349,15 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
* @param tableName
*/
def removeTableFromMetadata(dbName: String, tableName: String): Unit = {
- val metadataToBeRemoved: Option[TableMeta] = getTableFromMetadata(dbName, tableName)
+ val metadataToBeRemoved: Option[TableMeta] = getTableFromMetadataCache(dbName, tableName)
metadataToBeRemoved match {
case Some(tableMeta) =>
metadata.tablesMeta -= tableMeta
CarbonMetadata.getInstance.removeTable(dbName + "_" + tableName)
case None =>
- LOGGER.debug(s"No entry for table $tableName in database $dbName")
+ if (LOGGER.isDebugEnabled) {
+ LOGGER.debug(s"No entry for table $tableName in database $dbName")
+ }
}
}
@@ -402,23 +391,23 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
def isTablePathExists(tableIdentifier: TableIdentifier)(sparkSession: SparkSession): Boolean = {
- val dbName = tableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase)
- val tableName = tableIdentifier.table.toLowerCase
-
- val tablePath = CarbonStorePath.getCarbonTablePath(this.storePath,
- new CarbonTableIdentifier(dbName, tableName, "")).getPath
-
- val fileType = FileFactory.getFileType(tablePath)
- FileFactory.isFileExist(tablePath, fileType)
+ try {
+ val tablePath = lookupRelation(tableIdentifier)(sparkSession).
+ asInstanceOf[CarbonRelation].tableMeta.tablePath
+ val fileType = FileFactory.getFileType(tablePath)
+ FileFactory.isFileExist(tablePath, fileType)
+ } catch {
+ case e: Exception =>
+ false
+ }
}
- def dropTable(tableStorePath: String, tableIdentifier: TableIdentifier)
+ def dropTable(tablePath: String, tableIdentifier: TableIdentifier)
(sparkSession: SparkSession) {
val dbName = tableIdentifier.database.get
val tableName = tableIdentifier.table
-
- val metadataFilePath = CarbonStorePath.getCarbonTablePath(tableStorePath,
- new CarbonTableIdentifier(dbName, tableName, "")).getMetadataDirectoryPath
+ val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
+ val metadataFilePath = CarbonStorePath.getCarbonTablePath(identifier).getMetadataDirectoryPath
val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
if (null != carbonTable) {
// clear driver B-tree and dictionary cache
@@ -429,26 +418,18 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
if (FileFactory.isFileExist(metadataFilePath, fileType)) {
// while drop we should refresh the schema modified time so that if any thing has changed
// in the other beeline need to update.
- checkSchemasModifiedTimeAndReloadTables
-
- val metadataToBeRemoved: Option[TableMeta] = getTableFromMetadata(dbName,
- tableIdentifier.table)
- metadataToBeRemoved match {
- case Some(tableMeta) =>
- metadata.tablesMeta -= tableMeta
- CarbonMetadata.getInstance.removeTable(dbName + "_" + tableName)
- updateSchemasUpdatedTime(touchSchemaFileSystemTime(dbName, tableName))
- case None =>
- LOGGER.info(s"Metadata does not contain entry for table $tableName in database $dbName")
- }
+ checkSchemasModifiedTimeAndReloadTables(identifier.getStorePath)
+
+ removeTableFromMetadata(dbName, tableName)
+ updateSchemasUpdatedTime(touchSchemaFileSystemTime(identifier.getStorePath))
CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, sparkSession)
// discard cached table info in cachedDataSourceTables
sparkSession.sessionState.catalog.refreshTable(tableIdentifier)
}
}
- private def getTimestampFileAndType(databaseName: String, tableName: String) = {
- val timestampFile = storePath + "/" + CarbonCommonConstants.SCHEMAS_MODIFIED_TIME_FILE
+ private def getTimestampFileAndType(basePath: String) = {
+ val timestampFile = basePath + "/" + CarbonCommonConstants.SCHEMAS_MODIFIED_TIME_FILE
val timestampFileType = FileFactory.getFileType(timestampFile)
(timestampFile, timestampFileType)
}
@@ -462,37 +443,20 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
tableModifiedTimeStore.put("default", timeStamp)
}
- def updateAndTouchSchemasUpdatedTime(databaseName: String, tableName: String) {
- updateSchemasUpdatedTime(touchSchemaFileSystemTime(databaseName, tableName))
+ def updateAndTouchSchemasUpdatedTime(basePath: String) {
+ updateSchemasUpdatedTime(touchSchemaFileSystemTime(basePath))
}
- /**
- * This method will read the timestamp of empty schema file
- *
- * @param databaseName
- * @param tableName
- * @return
- */
- private def readSchemaFileSystemTime(databaseName: String, tableName: String): Long = {
- val (timestampFile, timestampFileType) = getTimestampFileAndType(databaseName, tableName)
- if (FileFactory.isFileExist(timestampFile, timestampFileType)) {
- FileFactory.getCarbonFile(timestampFile, timestampFileType).getLastModifiedTime
- } else {
- System.currentTimeMillis()
- }
- }
/**
* This method will check and create an empty schema timestamp file
*
- * @param databaseName
- * @param tableName
* @return
*/
- private def touchSchemaFileSystemTime(databaseName: String, tableName: String): Long = {
- val (timestampFile, timestampFileType) = getTimestampFileAndType(databaseName, tableName)
+ private def touchSchemaFileSystemTime(basePath: String): Long = {
+ val (timestampFile, timestampFileType) = getTimestampFileAndType(basePath)
if (!FileFactory.isFileExist(timestampFile, timestampFileType)) {
- LOGGER.audit(s"Creating timestamp file for $databaseName.$tableName")
+ LOGGER.audit(s"Creating timestamp file for $basePath")
FileFactory.createNewFile(timestampFile, timestampFileType)
}
val systemTime = System.currentTimeMillis()
@@ -501,8 +465,9 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
systemTime
}
- def checkSchemasModifiedTimeAndReloadTables() {
- val (timestampFile, timestampFileType) = getTimestampFileAndType("", "")
+ def checkSchemasModifiedTimeAndReloadTables(storePath: String) {
+ val (timestampFile, timestampFileType) =
+ getTimestampFileAndType(storePath)
if (FileFactory.isFileExist(timestampFile, timestampFileType)) {
if (!(FileFactory.getCarbonFile(timestampFile, timestampFileType).
getLastModifiedTime ==
@@ -513,7 +478,7 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
}
private def refreshCache() {
- metadata.tablesMeta = loadMetadata(storePath, nextQueryId).tablesMeta
+ metadata.tablesMeta.clear()
}
override def isReadFromHiveMetaStore: Boolean = false
@@ -527,4 +492,3 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
CarbonUtil.readSchemaFile(tableMetadataFile)
}
}
-