You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/07/04 14:05:05 UTC
[8/8] carbondata git commit: [CARBONDATA-1244] Polish docs and
comments in presto integration
[CARBONDATA-1244] Polish docs and comments in presto integration
This closes #1131
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/b699ee6f
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/b699ee6f
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/b699ee6f
Branch: refs/heads/encoding_override
Commit: b699ee6f72d280ff0969663f598769e66d8abdb9
Parents: bbb95ce
Author: bianhq <bi...@gmail.com>
Authored: Tue Jul 4 01:36:42 2017 +0800
Committer: chenliang613 <ch...@apache.org>
Committed: Tue Jul 4 11:52:19 2017 +0800
----------------------------------------------------------------------
integration/presto/README.md | 51 ++++--
.../presto/CarbondataConnectorFactory.java | 2 +-
.../presto/impl/CarbonLocalInputSplit.java | 13 +-
.../presto/impl/CarbonTableCacheModel.java | 2 +-
.../presto/impl/CarbonTableReader.java | 154 +++++++++++++++++--
5 files changed, 185 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b699ee6f/integration/presto/README.md
----------------------------------------------------------------------
diff --git a/integration/presto/README.md b/integration/presto/README.md
index 9935478..dc14cb0 100644
--- a/integration/presto/README.md
+++ b/integration/presto/README.md
@@ -20,14 +20,10 @@
Please follow the below steps to query carbondata in presto
### Config presto server
-* Download presto server 0.166 : https://repo1.maven.org/maven2/com/facebook/presto/presto-server/
-* Finish configuration as per https://prestodb.io/docs/current/installation/deployment.html
- for example:
+* Download presto server (0.166 is suggested and supported) : https://repo1.maven.org/maven2/com/facebook/presto/presto-server/
+* Finish presto configuration following https://prestodb.io/docs/current/installation/deployment.html.
+ A configuration example:
```
- carbondata.properties:
- connector.name=carbondata
- carbondata-store=/Users/apple/DEMO/presto_test/data
-
config.properties:
coordinator=true
node-scheduler.include-coordinator=true
@@ -57,30 +53,51 @@ Please follow the below steps to query carbondata in presto
node.id=ffffffff-ffff-ffff-ffff-ffffffffffff
node.data-dir=/Users/apple/DEMO/presto_test/data
```
-* config carbondata-connector for presto
+* Config carbondata-connector for presto
- First:compile carbondata-presto integration module
+ Firstly: Compile carbondata, including carbondata-presto integration module
```
$ git clone https://github.com/apache/carbondata
- $ cd carbondata/integration/presto
- $ mvn clean package
+ $ cd carbondata
+ $ mvn -DskipTests -P{spark-version} -Dspark.version={spark-version-number} -Dhadoop.version={hadoop-version-number} clean package
+ ```
+ Replace the spark and hadoop version with the version used in your cluster.
+ For example, if you are using Spark 2.1.0 and Hadoop 2.7.2, you would like to compile using:
+ ```
+ mvn -DskipTests -Pspark-2.1 -Dspark.version=2.1.0 -Dhadoop.version=2.7.2 clean package
+ ```
+
+ Secondly: Create a folder named 'carbondata' under $PRESTO_HOME$/plugin and
+ copy all jars from carbondata/integration/presto/target/carbondata-presto-x.x.x-SNAPSHOT
+ to $PRESTO_HOME$/plugin/carbondata
+
+ Thirdly: Create a carbondata.properties file under $PRESTO_HOME$/etc/catalog/ containing the following contents:
```
- Second:create one folder "carbondata" under ./presto-server-0.166/plugin
- Third:copy all jar from ./carbondata/integration/presto/target/carbondata-presto-x.x.x-SNAPSHOT
- to ./presto-server-0.166/plugin/carbondata
+ connector.name=carbondata
+ carbondata-store={schema-store-path}
+ ```
+ Replace the schema-store-path with the absolute path of the parent directory of the schema.
+ For example, if you have a schema named 'default' stored in hdfs://namenode:9000/test/carbondata/,
+ Then set carbondata-store=hdfs://namenode:9000/test/carbondata
+
+ If you updated the jar balls or configuration files, make sure you have dispatched them
+ to all the presto nodes and restarted the presto servers on the nodes. The updates will not take effect before restarting.
### Generate CarbonData file
-Please refer to quick start : https://github.com/apache/carbondata/blob/master/docs/quick-start-guide.md
+Please refer to quick start: https://github.com/apache/carbondata/blob/master/docs/quick-start-guide.md.
+Load data statement in Spark can be used to create carbondata tables. And then you can easily find the created
+carbondata files.
### Query carbondata in CLI of presto
-* Download presto-cli-0.166-executable.jar
+* Download presto cli client following: https://prestodb.io/docs/current/installation/cli.html
* Start CLI:
```
- $ ./presto-cli-0.166-executable.jar --server localhost:8086 --catalog carbondata --schema default
+ $ ./presto --server localhost:8086 --catalog carbondata --schema default
```
+ Replace the hostname, port and schema name with your own.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b699ee6f/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
index d97f19e..d557920 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
@@ -71,7 +71,7 @@ public class CarbondataConnectorFactory implements ConnectorFactory {
ConnectorSplitManager splitManager = injector.getInstance(ConnectorSplitManager.class);
ConnectorRecordSetProvider connectorRecordSet =
injector.getInstance(ConnectorRecordSetProvider.class);
- ConnectorPageSourceProvider connectorPageSource = injector.getInstance(ConnectorPageSourceProvider.class);
+ ConnectorPageSourceProvider connectorPageSource = injector.getInstance(ConnectorPageSourceProvider.class);
return new CarbondataConnector(lifeCycleManager, metadata,
new ClassLoaderSafeConnectorSplitManager(splitManager, classLoader), connectorRecordSet,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b699ee6f/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java
index ba8d9b5..f0a8428 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java
@@ -17,19 +17,22 @@
package org.apache.carbondata.presto.impl;
-import java.util.List;
-
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.List;
+
+/**
+ * CarbonLocalInputSplit represents a block, it contains a set of blocklet.
+ */
public class CarbonLocalInputSplit {
private static final long serialVersionUID = 3520344046772190207L;
private String segmentId;
private String path;
- private long start;
- private long length;
- private List<String> locations;
+ private long start; // the start offset of the block in a carbondata file.
+ private long length; // the length of the block.
+ private List<String> locations;// locations are the locations for different replicas.
private short version;
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b699ee6f/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableCacheModel.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableCacheModel.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableCacheModel.java
index 45755d1..2a4db14 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableCacheModel.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableCacheModel.java
@@ -23,7 +23,7 @@ import org.apache.carbondata.core.metadata.schema.table.TableInfo;
import org.apache.carbondata.core.util.path.CarbonTablePath;
/**
- * Caching Carbon meta(e.g. TableIdentifier, TablePath, TableInfo, CarbonTable) in Class CarbonTableReader
+ * Caching metadata of CarbonData(e.g. TableIdentifier, TablePath, TableInfo, CarbonTable) in Class CarbonTableReader
* to speed up query
*/
public class CarbonTableCacheModel {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b699ee6f/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
index c328a64..54832f5 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
@@ -72,15 +72,31 @@ import static java.util.Objects.requireNonNull;
* 2:FileFactory, (physic table file)
* 3:CarbonCommonFactory, (offer some )
* 4:DictionaryFactory, (parse dictionary util)
+ *
+ * Currently, it is mainly used to parse metadata of tables under
+ * the configured carbondata-store path and filter the relevant
+ * input splits with given query predicates.
*/
public class CarbonTableReader {
private CarbonTableConfig config;
+
+ /**
+ * The names of the tables under the schema (this.carbonFileList).
+ */
private List<SchemaTableName> tableList;
+
+ /**
+ * carbonFileList represents the store path of the schema, which is configured as carbondata-store
+ * in the CarbonData catalog file ($PRESTO_HOME$/etc/catalog/carbondata.properties).
+ */
private CarbonFile carbonFileList;
private FileFactory.FileType fileType;
- // A cache for Carbon reader
+ /**
+ * A cache for Carbon reader, with this cache,
+ * metadata of a table is only read from file system once.
+ */
private ConcurrentHashMap<SchemaTableName, CarbonTableCacheModel> cc;
@Inject public CarbonTableReader(CarbonTableConfig config) {
@@ -88,9 +104,14 @@ public class CarbonTableReader {
this.cc = new ConcurrentHashMap<>();
}
- // for worker node to initialize carbon metastore
+ /**
+ * For presto worker node to initialize the metadata cache of a table.
+ * @param table the name of the table and schema.
+ * @return
+ */
public CarbonTableCacheModel getCarbonCache(SchemaTableName table) {
if (!cc.containsKey(table)) {
+ // if this table is not cached, try to read the metadata of the table and cache it.
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(
FileFactory.class.getClassLoader())) {
if (carbonFileList == null) {
@@ -110,17 +131,26 @@ public class CarbonTableReader {
else return null;
}
+ /**
+ * Return the schema names under a schema store path (this.carbonFileList).
+ * @return
+ */
public List<String> getSchemaNames() {
return updateSchemaList();
}
- // default PathFilter
+ // default PathFilter, accepts files in carbondata format (with .carbondata extension).
private static final PathFilter DefaultFilter = new PathFilter() {
@Override public boolean accept(Path path) {
return CarbonTablePath.isCarbonDataFile(path.getName());
}
};
+ /**
+ * Get the CarbonFile instance which represents the store path in the configuration, and assign it to
+ * this.carbonFileList.
+ * @return
+ */
public boolean updateCarbonFile() {
if (carbonFileList == null) {
fileType = FileFactory.getFileType(config.getStorePath());
@@ -133,6 +163,10 @@ public class CarbonTableReader {
return true;
}
+ /**
+ * Return the schema names under a schema store path (this.carbonFileList).
+ * @return
+ */
public List<String> updateSchemaList() {
updateCarbonFile();
@@ -143,13 +177,23 @@ public class CarbonTableReader {
} else return ImmutableList.of();
}
+ /**
+ * Get the names of the tables in the given schema.
+ * @param schema name of the schema
+ * @return
+ */
public Set<String> getTableNames(String schema) {
requireNonNull(schema, "schema is null");
return updateTableList(schema);
}
- public Set<String> updateTableList(String dbName) {
- List<CarbonFile> schema = Stream.of(carbonFileList.listFiles()).filter(a -> dbName.equals(a.getName()))
+ /**
+ * Get the names of the tables in the given schema.
+ * @param schemaName name of the schema
+ * @return
+ */
+ public Set<String> updateTableList(String schemaName) {
+ List<CarbonFile> schema = Stream.of(carbonFileList.listFiles()).filter(a -> schemaName.equals(a.getName()))
.collect(Collectors.toList());
if (schema.size() > 0) {
return Stream.of((schema.get(0)).listFiles()).map(a -> a.getName())
@@ -157,6 +201,11 @@ public class CarbonTableReader {
} else return ImmutableSet.of();
}
+ /**
+ * Get the CarbonTable instance of the given table.
+ * @param schemaTableName name of the given table.
+ * @return
+ */
public CarbonTable getTable(SchemaTableName schemaTableName) {
try {
updateSchemaTables();
@@ -170,6 +219,11 @@ public class CarbonTableReader {
return table;
}
+ /**
+ * Find all the tables under the schema store path (this.carbonFileList)
+ * and cache all the table names in this.tableList. Notice that whenever this method
+ * is called, it clears this.tableList and populate the list by reading the files.
+ */
public void updateSchemaTables() {
// update logic determine later
if (carbonFileList == null) {
@@ -185,6 +239,12 @@ public class CarbonTableReader {
}
}
+ /**
+ * Find the table with the given name and build a CarbonTable instance for it.
+ * This method should be called after this.updateSchemaTables().
+ * @param schemaTableName name of the given table.
+ * @return
+ */
private CarbonTable loadTableMetadata(SchemaTableName schemaTableName) {
for (SchemaTableName table : tableList) {
if (!table.equals(schemaTableName)) continue;
@@ -195,7 +255,9 @@ public class CarbonTableReader {
}
/**
- * parse carbon metadata into cc(CarbonTableReader cache)
+ * Read the metadata of the given table and cache it in this.cc (CarbonTableReader cache).
+ * @param table name of the given table.
+ * @return the CarbonTable instance which contains all the needed metadata for a table.
*/
public CarbonTable parseCarbonMetadata(SchemaTableName table) {
CarbonTable result = null;
@@ -203,17 +265,25 @@ public class CarbonTableReader {
CarbonTableCacheModel cache = cc.getOrDefault(table, new CarbonTableCacheModel());
if (cache.isValid()) return cache.carbonTable;
- //Step1: get table meta path, load carbon table param
+ // If table is not previously cached, then:
+
+ // Step 1: get store path of the table and cache it.
String storePath = config.getStorePath();
+ // create table identifier. the table id is randomly generated.
cache.carbonTableIdentifier =
new CarbonTableIdentifier(table.getSchemaName(), table.getTableName(),
UUID.randomUUID().toString());
+ // get the store path of the table.
cache.carbonTablePath =
PathFactory.getInstance().getCarbonTablePath(storePath, cache.carbonTableIdentifier);
+ // cache the table
cc.put(table, cache);
- //Step2: check file existed? read schema file
+ //Step 2: read the metadata (tableInfo) of the table.
ThriftReader.TBaseCreator createTBase = new ThriftReader.TBaseCreator() {
+ // TBase is used to read and write thrift objects.
+ // TableInfo is a kind of TBase used to read and write table information.
+ // TableInfo is generated by thrift, see schema.thrift under format/src/main/thrift for details.
public TBase create() {
return new org.apache.carbondata.format.TableInfo();
}
@@ -225,14 +295,16 @@ public class CarbonTableReader {
(org.apache.carbondata.format.TableInfo) thriftReader.read();
thriftReader.close();
- // Step3: Transform Format Level TableInfo to Code Level TableInfo
+ // Step 3: convert format level TableInfo to code level TableInfo
SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
+ // wrapperTableInfo is the code level information of a table in carbondata core, different from the Thrift TableInfo.
TableInfo wrapperTableInfo = schemaConverter
.fromExternalToWrapperTableInfo(tableInfo, table.getSchemaName(), table.getTableName(),
storePath);
wrapperTableInfo.setMetaDataFilepath(
CarbonTablePath.getFolderContainingFile(cache.carbonTablePath.getSchemaFilePath()));
- // Step4: Load metadata info into CarbonMetadata
+
+ // Step 4: Load metadata info into CarbonMetadata
CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo);
cache.tableInfo = wrapperTableInfo;
@@ -246,6 +318,13 @@ public class CarbonTableReader {
return result;
}
+ /**
+ * Apply filters to the table and get valid input splits of the table.
+ * @param tableCacheModel the table
+ * @param filters the filters
+ * @return
+ * @throws Exception
+ */
public List<CarbonLocalInputSplit> getInputSplits2(CarbonTableCacheModel tableCacheModel,
Expression filters) throws Exception {
@@ -332,7 +411,16 @@ public class CarbonTableReader {
}
/**
- * get data blocks of given segment
+ * Get all the data blocks of a given segment.
+ * @param filterExpressionProcessor
+ * @param absoluteTableIdentifier
+ * @param tablePath
+ * @param resolver
+ * @param segmentId
+ * @param cacheClient
+ * @param updateStatusManager
+ * @return
+ * @throws IOException
*/
private List<DataRefNode> getDataBlocksOfSegment(
FilterExpressionProcessor filterExpressionProcessor,
@@ -380,6 +468,16 @@ public class CarbonTableReader {
return false;
}
+ /**
+ * Build and load the B-trees of the segment.
+ * @param absoluteTableIdentifier
+ * @param tablePath
+ * @param segmentId
+ * @param cacheClient
+ * @param updateStatusManager
+ * @return
+ * @throws IOException
+ */
private Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> getSegmentAbstractIndexs(/*JobContext job,*/
AbsoluteTableIdentifier absoluteTableIdentifier, CarbonTablePath tablePath, String segmentId,
CacheClient cacheClient, SegmentUpdateStatusManager updateStatusManager) throws IOException {
@@ -491,6 +589,13 @@ public class CarbonTableReader {
return false;
}
+ /**
+ * Get the input splits of a set of carbondata files.
+ * @param fileStatusList the file statuses of the set of carbondata files.
+ * @param targetSystem hdfs FileSystem
+ * @return
+ * @throws IOException
+ */
private List<InputSplit> getSplit(List<FileStatus> fileStatusList, FileSystem targetSystem)
throws IOException {
@@ -501,6 +606,7 @@ public class CarbonTableReader {
while (true) {
while (true) {
while (split.hasNext()) {
+ // file is a carbondata file
FileStatus file = (FileStatus) split.next();
Path path = file.getPath();
long length = file.getLen();
@@ -520,7 +626,7 @@ public class CarbonTableReader {
int blkIndex;
for (
bytesRemaining = length;
- (double) bytesRemaining / (double) splitSize > 1.1D;
+ (double) bytesRemaining / (double) splitSize > 1.1D;// when there are more than one splits left.
bytesRemaining -= splitSize) {
blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining);
splits.add(this.makeSplit(path, length - bytesRemaining, splitSize,
@@ -552,6 +658,15 @@ public class CarbonTableReader {
return new String[] { "0" };
}
+ /**
+ * Get all file statuses of the carbondata files with a segmentId in segmentsToConsider
+ * under the tablePath, and add them to the result.
+ * @param segmentsToConsider
+ * @param tablePath
+ * @param result
+ * @return the FileSystem instance been used in this function.
+ * @throws IOException
+ */
private FileSystem getFileStatusOfSegments(String[] segmentsToConsider, CarbonTablePath tablePath,
List<FileStatus> result) throws IOException {
String[] partitionsToConsider = getValidPartitions();
@@ -584,6 +699,7 @@ public class CarbonTableReader {
LocatedFileStatus stat = iter.next();
if (DefaultFilter.accept(stat.getPath())) {
if (stat.isDirectory()) {
+ // DefaultFiler accepts carbondata files.
addInputPathRecursively(result, fs, stat.getPath(), DefaultFilter);
} else {
result.add(stat);
@@ -598,6 +714,15 @@ public class CarbonTableReader {
return fs;
}
+ /**
+ * Get the FileStatus of all carbondata files under the path recursively,
+ * and add the file statuses into the result
+ * @param result
+ * @param fs
+ * @param path
+ * @param inputFilter the filter used to determinate whether a path is a carbondata file
+ * @throws IOException
+ */
protected void addInputPathRecursively(List<FileStatus> result, FileSystem fs, Path path,
PathFilter inputFilter) throws IOException {
RemoteIterator iter = fs.listLocatedStatus(path);
@@ -616,7 +741,10 @@ public class CarbonTableReader {
}
/**
- * get data blocks of given btree
+ * Get the data blocks of a b tree. the root node of the b tree is abstractIndex.dataRefNode.
+ * BTreeNode is a sub class of DataRefNode.
+ * @param abstractIndex
+ * @return
*/
private List<DataRefNode> getDataBlocksOfIndex(AbstractIndex abstractIndex) {
List<DataRefNode> blocks = new LinkedList<DataRefNode>();