You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ch...@apache.org on 2017/05/24 10:49:07 UTC

[4/5] carbondata git commit: refactor integration/presto

refactor integration/presto


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/0988a847
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/0988a847
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/0988a847

Branch: refs/heads/master
Commit: 0988a847d4bbfe38a2741d32aaf7c9a32f7e7889
Parents: f32c503
Author: chenliang613 <ch...@huawei.com>
Authored: Sat Apr 8 15:07:47 2017 +0530
Committer: chenliang613 <ch...@apache.org>
Committed: Wed May 24 18:42:11 2017 +0800

----------------------------------------------------------------------
 .../presto/CarbondataConnectorFactory.java      | 12 ------
 .../carbondata/presto/CarbondataMetadata.java   | 34 ++++++++---------
 .../presto/CarbondataRecordSetProvider.java     | 12 +++---
 .../presto/impl/CarbonTableReader.java          | 40 +++++++++-----------
 4 files changed, 37 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/0988a847/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
index 66c007d..d1c8082 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
@@ -67,25 +67,13 @@ public class CarbondataConnectorFactory implements ConnectorFactory {
 
       LifeCycleManager lifeCycleManager = injector.getInstance(LifeCycleManager.class);
       CarbondataMetadata metadata = injector.getInstance(CarbondataMetadata.class);
-      //HiveTransactionManager transactionManager = injector.getInstance(HiveTransactionManager.class);
       ConnectorSplitManager splitManager = injector.getInstance(ConnectorSplitManager.class);
       ConnectorRecordSetProvider connectorRecordSet =
           injector.getInstance(ConnectorRecordSetProvider.class);
-      //ConnectorAccessControl accessControl = injector.getInstance(ConnectorAccessControl.class);
-
-      //ConnectorPageSourceProvider connectorPageSource = injector.getInstance(ConnectorPageSourceProvider.class);
-      //ConnectorPageSinkProvider pageSinkProvider = injector.getInstance(ConnectorPageSinkProvider.class);
-      //ConnectorNodePartitioningProvider connectorDistributionProvider = injector.getInstance(ConnectorNodePartitioningProvider.class);
-      //HiveSessionProperties hiveSessionProperties = injector.getInstance(HiveSessionProperties.class);
-      //HiveTableProperties hiveTableProperties = injector.getInstance(HiveTableProperties.class);
 
       return new CarbondataConnector(lifeCycleManager, metadata,
           new ClassLoaderSafeConnectorSplitManager(splitManager, classLoader), connectorRecordSet,
-          //new ClassLoaderSafeConnectorRecordSetProvider(, classLoader),
           classLoader
-          //new ClassLoaderSafeConnectorPageSourceProvider(connectorPageSource, classLoader),
-          //new ClassLoaderSafeConnectorPageSinkProvider(pageSinkProvider, classLoader),
-          //new ClassLoaderSafeNodePartitioningProvider(connectorDistributionProvider, classLoader),
       );
     } catch (Exception e) {
       throw Throwables.propagate(e);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0988a847/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
index d938a3d..f2d594a 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
@@ -59,11 +59,11 @@ public class CarbondataMetadata implements ConnectorMetadata {
   }
 
   public List<String> listSchemaNamesInternal() {
-    List<String> ret;
+    List<String> schemaNameList;
     try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
-      ret = carbonTableReader.getSchemaNames();
+      schemaNameList = carbonTableReader.getSchemaNames();
     }
-    return ret;
+    return schemaNameList;
   }
 
   @Override
@@ -109,27 +109,28 @@ public class CarbondataMetadata implements ConnectorMetadata {
     return ImmutableList.of(new SchemaTableName(prefix.getSchemaName(), prefix.getTableName()));
   }
 
-  private ConnectorTableMetadata getTableMetadata(SchemaTableName tableName) {
-    if (!listSchemaNamesInternal().contains(tableName.getSchemaName())) {
+  private ConnectorTableMetadata getTableMetadata(SchemaTableName schemaTableName) {
+    if (!listSchemaNamesInternal().contains(schemaTableName.getSchemaName())) {
       return null;
     }
 
-    CarbonTable cb = carbonTableReader.getTable(tableName);
-    if (cb == null) {
+    CarbonTable carbonTable = carbonTableReader.getTable(schemaTableName);
+    if (carbonTable == null) {
       return null;
     }
 
-    List<ColumnMetadata> spiCols = new LinkedList<>();
-    List<CarbonColumn> carbonColumns = cb.getCreateOrderColumn(tableName.getTableName());
+    List<ColumnMetadata> columnsMetaList = new LinkedList<>();
+    List<CarbonColumn> carbonColumns = carbonTable.getCreateOrderColumn(schemaTableName.getTableName());
     for (CarbonColumn col : carbonColumns) {
       //show columns command will return these data
-      Type spiType = CarbondataType2SpiMapper(col.getColumnSchema().getDataType());
-      ColumnMetadata spiCol = new ColumnMetadata(col.getColumnSchema().getColumnName(), spiType);
-      spiCols.add(spiCol);
+      Type columnType = CarbondataType2SpiMapper(col.getColumnSchema().getDataType());
+      ColumnMetadata columnMeta = new ColumnMetadata(col.getColumnSchema().getColumnName(),
+          columnType);
+      columnsMetaList.add(columnMeta);
     }
 
     //carbondata connector's table metadata
-    return new ConnectorTableMetadata(tableName, spiCols);
+    return new ConnectorTableMetadata(schemaTableName, columnsMetaList);
   }
 
   @Override public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session,
@@ -141,6 +142,7 @@ public class CarbondataMetadata implements ConnectorMetadata {
         "tableHandle is not for this connector");
 
     String schemaName = handle.getSchemaTableName().getSchemaName();
+
     if (!listSchemaNamesInternal().contains(schemaName)) {
       throw new SchemaNotFoundException(schemaName);
     }
@@ -250,12 +252,6 @@ public class CarbondataMetadata implements ConnectorMetadata {
         return DateType.DATE;
       case TIMESTAMP:
         return TimestampType.TIMESTAMP;
-
-            /*case DataType.MAP:
-            case DataType.ARRAY:
-            case DataType.STRUCT:
-            case DataType.NULL:*/
-
       default:
         return VarcharType.VARCHAR;
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0988a847/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
index 8b087df..f0958c7 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
@@ -61,8 +61,6 @@ public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider {
 
   @Inject
   public CarbondataRecordSetProvider(CarbondataConnectorId connectorId, CarbonTableReader reader) {
-    //this.config = requireNonNull(config, "config is null");
-    //this.connector = requireNonNull(connector, "connector is null");
     this.connectorId = requireNonNull(connectorId, "connectorId is null").toString();
     this.carbonTableReader = reader;
   }
@@ -72,9 +70,9 @@ public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider {
     requireNonNull(split, "split is null");
     requireNonNull(columns, "columns is null");
 
-    CarbondataSplit cdSplit =
+    CarbondataSplit carbondataSplit =
         checkType(split, CarbondataSplit.class, "split is not class CarbondataSplit");
-    checkArgument(cdSplit.getConnectorId().equals(connectorId), "split is not for this connector");
+    checkArgument(carbondataSplit.getConnectorId().equals(connectorId), "split is not for this connector");
 
     String targetCols = "";
     // Convert all columns handles
@@ -95,7 +93,7 @@ public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider {
     //String cols = String.join(",", columns.stream().map(a -> ((CarbondataColumnHandle)a).getColumnName()).collect(Collectors.toList()));
 
     CarbonTableCacheModel tableCacheModel =
-        carbonTableReader.getCarbonCache(cdSplit.getSchemaTableName());
+        carbonTableReader.getCarbonCache(carbondataSplit.getSchemaTableName());
     checkNotNull(tableCacheModel, "tableCacheModel should not be null");
     checkNotNull(tableCacheModel.carbonTable, "tableCacheModel.carbonTable should not be null");
     checkNotNull(tableCacheModel.tableInfo, "tableCacheModel.tableInfo should not be null");
@@ -107,10 +105,10 @@ public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider {
         QueryModel.createModel(targetTable.getAbsoluteTableIdentifier(), queryPlan, targetTable);
 
     // Push down filter
-    fillFilter2QueryModel(queryModel, cdSplit.getConstraints(), targetTable);
+    fillFilter2QueryModel(queryModel, carbondataSplit.getConstraints(), targetTable);
 
     // Return new record set
-    return new CarbondataRecordSet(targetTable, session, cdSplit,
+    return new CarbondataRecordSet(targetTable, session, carbondataSplit,
         handles.build(), queryModel);
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0988a847/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
index bb482b0..b6e45d6 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
@@ -52,18 +52,13 @@ import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.hadoop.CacheClient;
 import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 import org.apache.thrift.TBase;
 
-import java.io.BufferedReader;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.net.URI;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
@@ -82,7 +77,7 @@ public class CarbonTableReader {
 
   private CarbonTableConfig config;
   private List<SchemaTableName> tableList;
-  private CarbonFile dbStore;
+  private CarbonFile carbonFileList;
   private FileFactory.FileType fileType;
 
   // A cache for Carbon reader
@@ -98,10 +93,10 @@ public class CarbonTableReader {
     if (!cc.containsKey(table)) {
       try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(
           FileFactory.class.getClassLoader())) {
-        if (dbStore == null) {
+        if (carbonFileList == null) {
           fileType = FileFactory.getFileType(config.getStorePath());
           try {
-            dbStore = FileFactory.getCarbonFile(config.getStorePath(), fileType);
+            carbonFileList = FileFactory.getCarbonFile(config.getStorePath(), fileType);
           } catch (Exception ex) {
             throw new RuntimeException(ex);
           }
@@ -126,11 +121,11 @@ public class CarbonTableReader {
     }
   };
 
-  public boolean updateDbStore() {
-    if (dbStore == null) {
+  public boolean updateCarbonFile() {
+    if (carbonFileList == null) {
       fileType = FileFactory.getFileType(config.getStorePath());
       try {
-        dbStore = FileFactory.getCarbonFile(config.getStorePath(), fileType);
+        carbonFileList = FileFactory.getCarbonFile(config.getStorePath(), fileType);
       } catch (Exception ex) {
         throw new RuntimeException(ex);
       }
@@ -139,12 +134,12 @@ public class CarbonTableReader {
   }
 
   public List<String> updateSchemaList() {
-    updateDbStore();
+    updateCarbonFile();
 
-    if (dbStore != null) {
-      List<String> scs =
-          Stream.of(dbStore.listFiles()).map(a -> a.getName()).collect(Collectors.toList());
-      return scs;
+    if (carbonFileList != null) {
+      List<String> schemaList =
+          Stream.of(carbonFileList.listFiles()).map(a -> a.getName()).collect(Collectors.toList());
+      return schemaList;
     } else return ImmutableList.of();
   }
 
@@ -154,7 +149,7 @@ public class CarbonTableReader {
   }
 
   public Set<String> updateTableList(String dbName) {
-    List<CarbonFile> schema = Stream.of(dbStore.listFiles()).filter(a -> dbName.equals(a.getName()))
+    List<CarbonFile> schema = Stream.of(carbonFileList.listFiles()).filter(a -> dbName.equals(a.getName()))
         .collect(Collectors.toList());
     if (schema.size() > 0) {
       return Stream.of((schema.get(0)).listFiles()).map(a -> a.getName())
@@ -177,15 +172,14 @@ public class CarbonTableReader {
 
   public void updateSchemaTables() {
     // update logic determine later
-    if (dbStore == null) {
+    if (carbonFileList == null) {
       updateSchemaList();
     }
-
     tableList = new LinkedList<>();
-    for (CarbonFile db : dbStore.listFiles()) {
-      if (!db.getName().endsWith(".mdt")) {
-        for (CarbonFile table : db.listFiles()) {
-          tableList.add(new SchemaTableName(db.getName(), table.getName()));
+    for (CarbonFile cf : carbonFileList.listFiles()) {
+      if (!cf.getName().endsWith(".mdt")) {
+        for (CarbonFile table : cf.listFiles()) {
+          tableList.add(new SchemaTableName(cf.getName(), table.getName()));
         }
       }
     }