You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/11/11 12:02:46 UTC
[05/11] carbondata git commit: [CARBONDATA-1694] [BugFix] Resolved
bug for Incorrect exception on presto CLI when a column is dropped from
carbon
[CARBONDATA-1694] [BugFix] Resolved bug for Incorrect exception on presto CLI when a column is dropped from carbon
Steps to reproduce : same as https://issues.apache.org/jira/browse/CARBONDATA-1694
This closes #1486
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/c6ca6409
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/c6ca6409
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/c6ca6409
Branch: refs/heads/pre-aggregate
Commit: c6ca6409486d59a79230a7ea64f1e05bb1ce3d16
Parents: ccb6560
Author: anubhav100 <an...@knoldus.in>
Authored: Fri Nov 10 19:47:25 2017 +0530
Committer: chenliang613 <ch...@huawei.com>
Committed: Sat Nov 11 15:24:01 2017 +0800
----------------------------------------------------------------------
.../presto/impl/CarbonTableReader.java | 79 +++++++++++++-------
1 file changed, 52 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c6ca6409/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
index 0fa7684..d61322d 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
@@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import org.apache.carbondata.core.datamap.DataMapStoreManager;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
@@ -41,6 +42,8 @@ import org.apache.carbondata.core.reader.ThriftReader;
import org.apache.carbondata.core.scan.expression.Expression;
import org.apache.carbondata.core.service.impl.PathFactory;
import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.events.OperationEventListener;
+import org.apache.carbondata.events.OperationListenerBus;
import org.apache.carbondata.hadoop.CarbonInputSplit;
import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
@@ -109,10 +112,11 @@ public class CarbonTableReader {
* @return
*/
public CarbonTableCacheModel getCarbonCache(SchemaTableName table) {
+
if (!cc.containsKey(table) || cc.get(table) == null) {
- // if this table is not cached, try to read the metadata of the table and cache it.
+// if this table is not cached, try to read the metadata of the table and cache it.
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(
- FileFactory.class.getClassLoader())) {
+ FileFactory.class.getClassLoader())) {
if (carbonFileList == null) {
fileType = FileFactory.getFileType(config.getStorePath());
try {
@@ -125,7 +129,6 @@ public class CarbonTableReader {
updateSchemaTables(table);
parseCarbonMetadata(table);
}
-
if (cc.containsKey(table)) {
return cc.get(table);
} else {
@@ -133,6 +136,12 @@ public class CarbonTableReader {
}
}
+ private void removeTableFromCache(SchemaTableName table) {
+ DataMapStoreManager.getInstance().clearDataMap(cc.get(table).carbonTable.getAbsoluteTableIdentifier());
+ cc.remove(table);
+ tableList.remove(table);
+ }
+
/**
* Return the schema names under a schema store path (this.carbonFileList).
*
@@ -192,11 +201,11 @@ public class CarbonTableReader {
*/
private Set<String> updateTableList(String schemaName) {
List<CarbonFile> schema =
- Stream.of(carbonFileList.listFiles()).filter(a -> schemaName.equals(a.getName()))
- .collect(Collectors.toList());
+ Stream.of(carbonFileList.listFiles()).filter(a -> schemaName.equals(a.getName()))
+ .collect(Collectors.toList());
if (schema.size() > 0) {
return Stream.of((schema.get(0)).listFiles()).map(CarbonFile::getName)
- .collect(Collectors.toSet());
+ .collect(Collectors.toSet());
} else return ImmutableSet.of();
}
@@ -225,10 +234,24 @@ public class CarbonTableReader {
* is called, it clears this.tableList and populate the list by reading the files.
*/
private void updateSchemaTables(SchemaTableName schemaTableName) {
- // update logic determine later
+// update logic determine later
+ boolean isKeyExists = cc.containsKey(schemaTableName);
+
if (carbonFileList == null) {
updateSchemaList();
}
+ try {
+ if(isKeyExists && !FileFactory.isFileExist(cc.get(schemaTableName).carbonTablePath.getSchemaFilePath(),fileType)){
+ removeTableFromCache(schemaTableName);
+ throw new TableNotFoundException(schemaTableName);
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ throw new RuntimeException();
+ }
+ if(isKeyExists && FileFactory.getCarbonFile(cc.get(schemaTableName).carbonTablePath.getPath()).getLastModifiedTime() > cc.get(schemaTableName).tableInfo.getLastUpdatedTime()){
+ removeTableFromCache(schemaTableName);
+ }
if(!tableList.contains(schemaTableName)) {
for (CarbonFile cf : carbonFileList.listFiles()) {
if (!cf.getName().endsWith(".mdt")) {
@@ -240,6 +263,7 @@ public class CarbonTableReader {
}
}
+
/**
* Find the table with the given name and build a CarbonTable instance for it.
* This method should be called after this.updateSchemaTables().
@@ -274,11 +298,11 @@ public class CarbonTableReader {
String storePath = config.getStorePath();
// create table identifier. the table id is randomly generated.
cache.carbonTableIdentifier =
- new CarbonTableIdentifier(table.getSchemaName(), table.getTableName(),
- UUID.randomUUID().toString());
+ new CarbonTableIdentifier(table.getSchemaName(), table.getTableName(),
+ UUID.randomUUID().toString());
// get the store path of the table.
cache.carbonTablePath =
- PathFactory.getInstance().getCarbonTablePath(storePath, cache.carbonTableIdentifier, null);
+ PathFactory.getInstance().getCarbonTablePath(storePath, cache.carbonTableIdentifier, null);
// cache the table
cc.put(table, cache);
@@ -292,27 +316,28 @@ public class CarbonTableReader {
}
};
ThriftReader thriftReader =
- new ThriftReader(cache.carbonTablePath.getSchemaFilePath(), createTBase);
+ new ThriftReader(cache.carbonTablePath.getSchemaFilePath(), createTBase);
thriftReader.open();
org.apache.carbondata.format.TableInfo tableInfo =
- (org.apache.carbondata.format.TableInfo) thriftReader.read();
+ (org.apache.carbondata.format.TableInfo) thriftReader.read();
thriftReader.close();
+
// Step 3: convert format level TableInfo to code level TableInfo
SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
// wrapperTableInfo is the code level information of a table in carbondata core, different from the Thrift TableInfo.
TableInfo wrapperTableInfo = schemaConverter
- .fromExternalToWrapperTableInfo(tableInfo, table.getSchemaName(), table.getTableName(),
- storePath);
+ .fromExternalToWrapperTableInfo(tableInfo, table.getSchemaName(), table.getTableName(),
+ storePath);
wrapperTableInfo.setMetaDataFilepath(
- CarbonTablePath.getFolderContainingFile(cache.carbonTablePath.getSchemaFilePath()));
+ CarbonTablePath.getFolderContainingFile(cache.carbonTablePath.getSchemaFilePath()));
// Step 4: Load metadata info into CarbonMetadata
CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo);
cache.tableInfo = wrapperTableInfo;
cache.carbonTable = CarbonMetadata.getInstance()
- .getCarbonTable(cache.carbonTableIdentifier.getTableUniqueName());
+ .getCarbonTable(cache.carbonTableIdentifier.getTableUniqueName());
result = cache.carbonTable;
} catch (Exception ex) {
throw new RuntimeException(ex);
@@ -323,7 +348,7 @@ public class CarbonTableReader {
public List<CarbonLocalInputSplit> getInputSplits2(CarbonTableCacheModel tableCacheModel,
- Expression filters) {
+ Expression filters) {
List<CarbonLocalInputSplit> result = new ArrayList<>();
CarbonTable carbonTable = tableCacheModel.carbonTable;
@@ -331,14 +356,14 @@ public class CarbonTableReader {
Configuration config = new Configuration();
config.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, "");
String carbonTablePath = PathFactory.getInstance()
- .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier().getStorePath(),
- carbonTable.getCarbonTableIdentifier(), null).getPath();
+ .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier().getStorePath(),
+ carbonTable.getCarbonTableIdentifier(), null).getPath();
config.set(CarbonTableInputFormat.INPUT_DIR, carbonTablePath);
try {
CarbonTableInputFormat.setTableInfo(config, tableInfo);
CarbonTableInputFormat carbonTableInputFormat =
- createInputFormat(config, carbonTable.getAbsoluteTableIdentifier(), filters);
+ createInputFormat(config, carbonTable.getAbsoluteTableIdentifier(), filters);
JobConf jobConf = new JobConf(config);
Job job = Job.getInstance(jobConf);
List<InputSplit> splits = carbonTableInputFormat.getSplits(job);
@@ -348,11 +373,11 @@ public class CarbonTableReader {
for (InputSplit inputSplit : splits) {
carbonInputSplit = (CarbonInputSplit) inputSplit;
result.add(new CarbonLocalInputSplit(carbonInputSplit.getSegmentId(),
- carbonInputSplit.getPath().toString(), carbonInputSplit.getStart(),
- carbonInputSplit.getLength(), Arrays.asList(carbonInputSplit.getLocations()),
- carbonInputSplit.getNumberOfBlocklets(), carbonInputSplit.getVersion().number(),
- carbonInputSplit.getDeleteDeltaFiles(),
- gson.toJson(carbonInputSplit.getDetailInfo())));
+ carbonInputSplit.getPath().toString(), carbonInputSplit.getStart(),
+ carbonInputSplit.getLength(), Arrays.asList(carbonInputSplit.getLocations()),
+ carbonInputSplit.getNumberOfBlocklets(), carbonInputSplit.getVersion().number(),
+ carbonInputSplit.getDeleteDeltaFiles(),
+ gson.toJson(carbonInputSplit.getDetailInfo())));
}
}
@@ -364,10 +389,10 @@ public class CarbonTableReader {
}
private CarbonTableInputFormat<Object> createInputFormat( Configuration conf, AbsoluteTableIdentifier identifier, Expression filterExpression)
- throws IOException {
+ throws IOException {
CarbonTableInputFormat format = new CarbonTableInputFormat<Object>();
CarbonTableInputFormat.setTablePath(conf,
- identifier.appendWithLocalPrefix(identifier.getTablePath()));
+ identifier.appendWithLocalPrefix(identifier.getTablePath()));
CarbonTableInputFormat.setFilterPredicates(conf, filterExpression);
return format;