You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ch...@apache.org on 2017/05/24 10:49:07 UTC
[4/5] carbondata git commit: refactor integration/presto
refactor integration/presto
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/0988a847
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/0988a847
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/0988a847
Branch: refs/heads/master
Commit: 0988a847d4bbfe38a2741d32aaf7c9a32f7e7889
Parents: f32c503
Author: chenliang613 <ch...@huawei.com>
Authored: Sat Apr 8 15:07:47 2017 +0530
Committer: chenliang613 <ch...@apache.org>
Committed: Wed May 24 18:42:11 2017 +0800
----------------------------------------------------------------------
.../presto/CarbondataConnectorFactory.java | 12 ------
.../carbondata/presto/CarbondataMetadata.java | 34 ++++++++---------
.../presto/CarbondataRecordSetProvider.java | 12 +++---
.../presto/impl/CarbonTableReader.java | 40 +++++++++-----------
4 files changed, 37 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0988a847/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
index 66c007d..d1c8082 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
@@ -67,25 +67,13 @@ public class CarbondataConnectorFactory implements ConnectorFactory {
LifeCycleManager lifeCycleManager = injector.getInstance(LifeCycleManager.class);
CarbondataMetadata metadata = injector.getInstance(CarbondataMetadata.class);
- //HiveTransactionManager transactionManager = injector.getInstance(HiveTransactionManager.class);
ConnectorSplitManager splitManager = injector.getInstance(ConnectorSplitManager.class);
ConnectorRecordSetProvider connectorRecordSet =
injector.getInstance(ConnectorRecordSetProvider.class);
- //ConnectorAccessControl accessControl = injector.getInstance(ConnectorAccessControl.class);
-
- //ConnectorPageSourceProvider connectorPageSource = injector.getInstance(ConnectorPageSourceProvider.class);
- //ConnectorPageSinkProvider pageSinkProvider = injector.getInstance(ConnectorPageSinkProvider.class);
- //ConnectorNodePartitioningProvider connectorDistributionProvider = injector.getInstance(ConnectorNodePartitioningProvider.class);
- //HiveSessionProperties hiveSessionProperties = injector.getInstance(HiveSessionProperties.class);
- //HiveTableProperties hiveTableProperties = injector.getInstance(HiveTableProperties.class);
return new CarbondataConnector(lifeCycleManager, metadata,
new ClassLoaderSafeConnectorSplitManager(splitManager, classLoader), connectorRecordSet,
- //new ClassLoaderSafeConnectorRecordSetProvider(, classLoader),
classLoader
- //new ClassLoaderSafeConnectorPageSourceProvider(connectorPageSource, classLoader),
- //new ClassLoaderSafeConnectorPageSinkProvider(pageSinkProvider, classLoader),
- //new ClassLoaderSafeNodePartitioningProvider(connectorDistributionProvider, classLoader),
);
} catch (Exception e) {
throw Throwables.propagate(e);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0988a847/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
index d938a3d..f2d594a 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
@@ -59,11 +59,11 @@ public class CarbondataMetadata implements ConnectorMetadata {
}
public List<String> listSchemaNamesInternal() {
- List<String> ret;
+ List<String> schemaNameList;
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
- ret = carbonTableReader.getSchemaNames();
+ schemaNameList = carbonTableReader.getSchemaNames();
}
- return ret;
+ return schemaNameList;
}
@Override
@@ -109,27 +109,28 @@ public class CarbondataMetadata implements ConnectorMetadata {
return ImmutableList.of(new SchemaTableName(prefix.getSchemaName(), prefix.getTableName()));
}
- private ConnectorTableMetadata getTableMetadata(SchemaTableName tableName) {
- if (!listSchemaNamesInternal().contains(tableName.getSchemaName())) {
+ private ConnectorTableMetadata getTableMetadata(SchemaTableName schemaTableName) {
+ if (!listSchemaNamesInternal().contains(schemaTableName.getSchemaName())) {
return null;
}
- CarbonTable cb = carbonTableReader.getTable(tableName);
- if (cb == null) {
+ CarbonTable carbonTable = carbonTableReader.getTable(schemaTableName);
+ if (carbonTable == null) {
return null;
}
- List<ColumnMetadata> spiCols = new LinkedList<>();
- List<CarbonColumn> carbonColumns = cb.getCreateOrderColumn(tableName.getTableName());
+ List<ColumnMetadata> columnsMetaList = new LinkedList<>();
+ List<CarbonColumn> carbonColumns = carbonTable.getCreateOrderColumn(schemaTableName.getTableName());
for (CarbonColumn col : carbonColumns) {
//show columns command will return these data
- Type spiType = CarbondataType2SpiMapper(col.getColumnSchema().getDataType());
- ColumnMetadata spiCol = new ColumnMetadata(col.getColumnSchema().getColumnName(), spiType);
- spiCols.add(spiCol);
+ Type columnType = CarbondataType2SpiMapper(col.getColumnSchema().getDataType());
+ ColumnMetadata columnMeta = new ColumnMetadata(col.getColumnSchema().getColumnName(),
+ columnType);
+ columnsMetaList.add(columnMeta);
}
//carbondata connector's table metadata
- return new ConnectorTableMetadata(tableName, spiCols);
+ return new ConnectorTableMetadata(schemaTableName, columnsMetaList);
}
@Override public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session,
@@ -141,6 +142,7 @@ public class CarbondataMetadata implements ConnectorMetadata {
"tableHandle is not for this connector");
String schemaName = handle.getSchemaTableName().getSchemaName();
+
if (!listSchemaNamesInternal().contains(schemaName)) {
throw new SchemaNotFoundException(schemaName);
}
@@ -250,12 +252,6 @@ public class CarbondataMetadata implements ConnectorMetadata {
return DateType.DATE;
case TIMESTAMP:
return TimestampType.TIMESTAMP;
-
- /*case DataType.MAP:
- case DataType.ARRAY:
- case DataType.STRUCT:
- case DataType.NULL:*/
-
default:
return VarcharType.VARCHAR;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0988a847/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
index 8b087df..f0958c7 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
@@ -61,8 +61,6 @@ public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider {
@Inject
public CarbondataRecordSetProvider(CarbondataConnectorId connectorId, CarbonTableReader reader) {
- //this.config = requireNonNull(config, "config is null");
- //this.connector = requireNonNull(connector, "connector is null");
this.connectorId = requireNonNull(connectorId, "connectorId is null").toString();
this.carbonTableReader = reader;
}
@@ -72,9 +70,9 @@ public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider {
requireNonNull(split, "split is null");
requireNonNull(columns, "columns is null");
- CarbondataSplit cdSplit =
+ CarbondataSplit carbondataSplit =
checkType(split, CarbondataSplit.class, "split is not class CarbondataSplit");
- checkArgument(cdSplit.getConnectorId().equals(connectorId), "split is not for this connector");
+ checkArgument(carbondataSplit.getConnectorId().equals(connectorId), "split is not for this connector");
String targetCols = "";
// Convert all columns handles
@@ -95,7 +93,7 @@ public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider {
//String cols = String.join(",", columns.stream().map(a -> ((CarbondataColumnHandle)a).getColumnName()).collect(Collectors.toList()));
CarbonTableCacheModel tableCacheModel =
- carbonTableReader.getCarbonCache(cdSplit.getSchemaTableName());
+ carbonTableReader.getCarbonCache(carbondataSplit.getSchemaTableName());
checkNotNull(tableCacheModel, "tableCacheModel should not be null");
checkNotNull(tableCacheModel.carbonTable, "tableCacheModel.carbonTable should not be null");
checkNotNull(tableCacheModel.tableInfo, "tableCacheModel.tableInfo should not be null");
@@ -107,10 +105,10 @@ public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider {
QueryModel.createModel(targetTable.getAbsoluteTableIdentifier(), queryPlan, targetTable);
// Push down filter
- fillFilter2QueryModel(queryModel, cdSplit.getConstraints(), targetTable);
+ fillFilter2QueryModel(queryModel, carbondataSplit.getConstraints(), targetTable);
// Return new record set
- return new CarbondataRecordSet(targetTable, session, cdSplit,
+ return new CarbondataRecordSet(targetTable, session, carbondataSplit,
handles.build(), queryModel);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0988a847/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
index bb482b0..b6e45d6 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
@@ -52,18 +52,13 @@ import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.hadoop.CacheClient;
import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
-import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.thrift.TBase;
-import java.io.BufferedReader;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.net.URI;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
@@ -82,7 +77,7 @@ public class CarbonTableReader {
private CarbonTableConfig config;
private List<SchemaTableName> tableList;
- private CarbonFile dbStore;
+ private CarbonFile carbonFileList;
private FileFactory.FileType fileType;
// A cache for Carbon reader
@@ -98,10 +93,10 @@ public class CarbonTableReader {
if (!cc.containsKey(table)) {
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(
FileFactory.class.getClassLoader())) {
- if (dbStore == null) {
+ if (carbonFileList == null) {
fileType = FileFactory.getFileType(config.getStorePath());
try {
- dbStore = FileFactory.getCarbonFile(config.getStorePath(), fileType);
+ carbonFileList = FileFactory.getCarbonFile(config.getStorePath(), fileType);
} catch (Exception ex) {
throw new RuntimeException(ex);
}
@@ -126,11 +121,11 @@ public class CarbonTableReader {
}
};
- public boolean updateDbStore() {
- if (dbStore == null) {
+ public boolean updateCarbonFile() {
+ if (carbonFileList == null) {
fileType = FileFactory.getFileType(config.getStorePath());
try {
- dbStore = FileFactory.getCarbonFile(config.getStorePath(), fileType);
+ carbonFileList = FileFactory.getCarbonFile(config.getStorePath(), fileType);
} catch (Exception ex) {
throw new RuntimeException(ex);
}
@@ -139,12 +134,12 @@ public class CarbonTableReader {
}
public List<String> updateSchemaList() {
- updateDbStore();
+ updateCarbonFile();
- if (dbStore != null) {
- List<String> scs =
- Stream.of(dbStore.listFiles()).map(a -> a.getName()).collect(Collectors.toList());
- return scs;
+ if (carbonFileList != null) {
+ List<String> schemaList =
+ Stream.of(carbonFileList.listFiles()).map(a -> a.getName()).collect(Collectors.toList());
+ return schemaList;
} else return ImmutableList.of();
}
@@ -154,7 +149,7 @@ public class CarbonTableReader {
}
public Set<String> updateTableList(String dbName) {
- List<CarbonFile> schema = Stream.of(dbStore.listFiles()).filter(a -> dbName.equals(a.getName()))
+ List<CarbonFile> schema = Stream.of(carbonFileList.listFiles()).filter(a -> dbName.equals(a.getName()))
.collect(Collectors.toList());
if (schema.size() > 0) {
return Stream.of((schema.get(0)).listFiles()).map(a -> a.getName())
@@ -177,15 +172,14 @@ public class CarbonTableReader {
public void updateSchemaTables() {
// update logic determine later
- if (dbStore == null) {
+ if (carbonFileList == null) {
updateSchemaList();
}
-
tableList = new LinkedList<>();
- for (CarbonFile db : dbStore.listFiles()) {
- if (!db.getName().endsWith(".mdt")) {
- for (CarbonFile table : db.listFiles()) {
- tableList.add(new SchemaTableName(db.getName(), table.getName()));
+ for (CarbonFile cf : carbonFileList.listFiles()) {
+ if (!cf.getName().endsWith(".mdt")) {
+ for (CarbonFile table : cf.listFiles()) {
+ tableList.add(new SchemaTableName(cf.getName(), table.getName()));
}
}
}