You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/11/13 22:12:01 UTC

[35/49] carbondata git commit: [CARBONDATA-1694] [BugFix] Resolved bug for Incorrect exception on presto CLI when a column is dropped from carbon

[CARBONDATA-1694] [BugFix] Resolved bug for Incorrect exception on presto CLI when a column is dropped from carbon

Steps to reproduce : same as https://issues.apache.org/jira/browse/CARBONDATA-1694

This closes #1486


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/c6ca6409
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/c6ca6409
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/c6ca6409

Branch: refs/heads/fgdatamap
Commit: c6ca6409486d59a79230a7ea64f1e05bb1ce3d16
Parents: ccb6560
Author: anubhav100 <an...@knoldus.in>
Authored: Fri Nov 10 19:47:25 2017 +0530
Committer: chenliang613 <ch...@huawei.com>
Committed: Sat Nov 11 15:24:01 2017 +0800

----------------------------------------------------------------------
 .../presto/impl/CarbonTableReader.java          | 79 +++++++++++++-------
 1 file changed, 52 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/c6ca6409/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
index 0fa7684..d61322d 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
@@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import org.apache.carbondata.core.datamap.DataMapStoreManager;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
@@ -41,6 +42,8 @@ import org.apache.carbondata.core.reader.ThriftReader;
 import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.service.impl.PathFactory;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.events.OperationEventListener;
+import org.apache.carbondata.events.OperationListenerBus;
 import org.apache.carbondata.hadoop.CarbonInputSplit;
 import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
 
@@ -109,10 +112,11 @@ public class CarbonTableReader {
    * @return
    */
   public CarbonTableCacheModel getCarbonCache(SchemaTableName table) {
+
     if (!cc.containsKey(table) || cc.get(table) == null) {
-      // if this table is not cached, try to read the metadata of the table and cache it.
+// if this table is not cached, try to read the metadata of the table and cache it.
       try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(
-          FileFactory.class.getClassLoader())) {
+              FileFactory.class.getClassLoader())) {
         if (carbonFileList == null) {
           fileType = FileFactory.getFileType(config.getStorePath());
           try {
@@ -125,7 +129,6 @@ public class CarbonTableReader {
       updateSchemaTables(table);
       parseCarbonMetadata(table);
     }
-
     if (cc.containsKey(table)) {
       return cc.get(table);
     } else {
@@ -133,6 +136,12 @@ public class CarbonTableReader {
     }
   }
 
+  private void removeTableFromCache(SchemaTableName table) {
+    DataMapStoreManager.getInstance().clearDataMap(cc.get(table).carbonTable.getAbsoluteTableIdentifier());
+    cc.remove(table);
+    tableList.remove(table);
+  }
+
   /**
    * Return the schema names under a schema store path (this.carbonFileList).
    *
@@ -192,11 +201,11 @@ public class CarbonTableReader {
    */
   private Set<String> updateTableList(String schemaName) {
     List<CarbonFile> schema =
-        Stream.of(carbonFileList.listFiles()).filter(a -> schemaName.equals(a.getName()))
-            .collect(Collectors.toList());
+            Stream.of(carbonFileList.listFiles()).filter(a -> schemaName.equals(a.getName()))
+                    .collect(Collectors.toList());
     if (schema.size() > 0) {
       return Stream.of((schema.get(0)).listFiles()).map(CarbonFile::getName)
-          .collect(Collectors.toSet());
+              .collect(Collectors.toSet());
     } else return ImmutableSet.of();
   }
 
@@ -225,10 +234,24 @@ public class CarbonTableReader {
    * is called, it clears this.tableList and populate the list by reading the files.
    */
   private void updateSchemaTables(SchemaTableName schemaTableName) {
-    // update logic determine later
+// update logic determine later
+    boolean isKeyExists = cc.containsKey(schemaTableName);
+
     if (carbonFileList == null) {
       updateSchemaList();
     }
+    try {
+      if(isKeyExists && !FileFactory.isFileExist(cc.get(schemaTableName).carbonTablePath.getSchemaFilePath(),fileType)){
+        removeTableFromCache(schemaTableName);
+        throw new TableNotFoundException(schemaTableName);
+      }
+    } catch (IOException e) {
+      e.printStackTrace();
+      throw new RuntimeException();
+    }
+    if(isKeyExists && FileFactory.getCarbonFile(cc.get(schemaTableName).carbonTablePath.getPath()).getLastModifiedTime() > cc.get(schemaTableName).tableInfo.getLastUpdatedTime()){
+      removeTableFromCache(schemaTableName);
+    }
     if(!tableList.contains(schemaTableName)) {
       for (CarbonFile cf : carbonFileList.listFiles()) {
         if (!cf.getName().endsWith(".mdt")) {
@@ -240,6 +263,7 @@ public class CarbonTableReader {
     }
   }
 
+
   /**
    * Find the table with the given name and build a CarbonTable instance for it.
    * This method should be called after this.updateSchemaTables().
@@ -274,11 +298,11 @@ public class CarbonTableReader {
       String storePath = config.getStorePath();
       // create table identifier. the table id is randomly generated.
       cache.carbonTableIdentifier =
-          new CarbonTableIdentifier(table.getSchemaName(), table.getTableName(),
-              UUID.randomUUID().toString());
+              new CarbonTableIdentifier(table.getSchemaName(), table.getTableName(),
+                      UUID.randomUUID().toString());
       // get the store path of the table.
       cache.carbonTablePath =
-          PathFactory.getInstance().getCarbonTablePath(storePath, cache.carbonTableIdentifier, null);
+              PathFactory.getInstance().getCarbonTablePath(storePath, cache.carbonTableIdentifier, null);
       // cache the table
       cc.put(table, cache);
 
@@ -292,27 +316,28 @@ public class CarbonTableReader {
         }
       };
       ThriftReader thriftReader =
-          new ThriftReader(cache.carbonTablePath.getSchemaFilePath(), createTBase);
+              new ThriftReader(cache.carbonTablePath.getSchemaFilePath(), createTBase);
       thriftReader.open();
       org.apache.carbondata.format.TableInfo tableInfo =
-          (org.apache.carbondata.format.TableInfo) thriftReader.read();
+              (org.apache.carbondata.format.TableInfo) thriftReader.read();
       thriftReader.close();
 
+
       // Step 3: convert format level TableInfo to code level TableInfo
       SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
       // wrapperTableInfo is the code level information of a table in carbondata core, different from the Thrift TableInfo.
       TableInfo wrapperTableInfo = schemaConverter
-          .fromExternalToWrapperTableInfo(tableInfo, table.getSchemaName(), table.getTableName(),
-              storePath);
+              .fromExternalToWrapperTableInfo(tableInfo, table.getSchemaName(), table.getTableName(),
+                      storePath);
       wrapperTableInfo.setMetaDataFilepath(
-          CarbonTablePath.getFolderContainingFile(cache.carbonTablePath.getSchemaFilePath()));
+              CarbonTablePath.getFolderContainingFile(cache.carbonTablePath.getSchemaFilePath()));
 
       // Step 4: Load metadata info into CarbonMetadata
       CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo);
 
       cache.tableInfo = wrapperTableInfo;
       cache.carbonTable = CarbonMetadata.getInstance()
-          .getCarbonTable(cache.carbonTableIdentifier.getTableUniqueName());
+              .getCarbonTable(cache.carbonTableIdentifier.getTableUniqueName());
       result = cache.carbonTable;
     } catch (Exception ex) {
       throw new RuntimeException(ex);
@@ -323,7 +348,7 @@ public class CarbonTableReader {
 
 
   public List<CarbonLocalInputSplit> getInputSplits2(CarbonTableCacheModel tableCacheModel,
-      Expression filters)  {
+                                                     Expression filters)  {
     List<CarbonLocalInputSplit> result = new ArrayList<>();
 
     CarbonTable carbonTable = tableCacheModel.carbonTable;
@@ -331,14 +356,14 @@ public class CarbonTableReader {
     Configuration config = new Configuration();
     config.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, "");
     String carbonTablePath = PathFactory.getInstance()
-        .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier().getStorePath(),
-            carbonTable.getCarbonTableIdentifier(), null).getPath();
+            .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier().getStorePath(),
+                    carbonTable.getCarbonTableIdentifier(), null).getPath();
     config.set(CarbonTableInputFormat.INPUT_DIR, carbonTablePath);
 
     try {
       CarbonTableInputFormat.setTableInfo(config, tableInfo);
       CarbonTableInputFormat carbonTableInputFormat =
-          createInputFormat(config, carbonTable.getAbsoluteTableIdentifier(), filters);
+              createInputFormat(config, carbonTable.getAbsoluteTableIdentifier(), filters);
       JobConf jobConf = new JobConf(config);
       Job job = Job.getInstance(jobConf);
       List<InputSplit> splits = carbonTableInputFormat.getSplits(job);
@@ -348,11 +373,11 @@ public class CarbonTableReader {
         for (InputSplit inputSplit : splits) {
           carbonInputSplit = (CarbonInputSplit) inputSplit;
           result.add(new CarbonLocalInputSplit(carbonInputSplit.getSegmentId(),
-              carbonInputSplit.getPath().toString(), carbonInputSplit.getStart(),
-              carbonInputSplit.getLength(), Arrays.asList(carbonInputSplit.getLocations()),
-              carbonInputSplit.getNumberOfBlocklets(), carbonInputSplit.getVersion().number(),
-              carbonInputSplit.getDeleteDeltaFiles(),
-              gson.toJson(carbonInputSplit.getDetailInfo())));
+                  carbonInputSplit.getPath().toString(), carbonInputSplit.getStart(),
+                  carbonInputSplit.getLength(), Arrays.asList(carbonInputSplit.getLocations()),
+                  carbonInputSplit.getNumberOfBlocklets(), carbonInputSplit.getVersion().number(),
+                  carbonInputSplit.getDeleteDeltaFiles(),
+                  gson.toJson(carbonInputSplit.getDetailInfo())));
         }
       }
 
@@ -364,10 +389,10 @@ public class CarbonTableReader {
   }
 
   private CarbonTableInputFormat<Object>  createInputFormat( Configuration conf, AbsoluteTableIdentifier identifier, Expression filterExpression)
-      throws IOException {
+          throws IOException {
     CarbonTableInputFormat format = new CarbonTableInputFormat<Object>();
     CarbonTableInputFormat.setTablePath(conf,
-        identifier.appendWithLocalPrefix(identifier.getTablePath()));
+            identifier.appendWithLocalPrefix(identifier.getTablePath()));
     CarbonTableInputFormat.setFilterPredicates(conf, filterExpression);
 
     return format;