You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/07/05 00:44:58 UTC

[49/50] [abbrv] carbondata git commit: [CARBONDATA-1244] Polish docs and comments in presto integration

[CARBONDATA-1244] Polish docs and comments in presto integration

This closes #1131


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/b699ee6f
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/b699ee6f
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/b699ee6f

Branch: refs/heads/streaming_ingest
Commit: b699ee6f72d280ff0969663f598769e66d8abdb9
Parents: bbb95ce
Author: bianhq <bi...@gmail.com>
Authored: Tue Jul 4 01:36:42 2017 +0800
Committer: chenliang613 <ch...@apache.org>
Committed: Tue Jul 4 11:52:19 2017 +0800

----------------------------------------------------------------------
 integration/presto/README.md                    |  51 ++++--
 .../presto/CarbondataConnectorFactory.java      |   2 +-
 .../presto/impl/CarbonLocalInputSplit.java      |  13 +-
 .../presto/impl/CarbonTableCacheModel.java      |   2 +-
 .../presto/impl/CarbonTableReader.java          | 154 +++++++++++++++++--
 5 files changed, 185 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/b699ee6f/integration/presto/README.md
----------------------------------------------------------------------
diff --git a/integration/presto/README.md b/integration/presto/README.md
index 9935478..dc14cb0 100644
--- a/integration/presto/README.md
+++ b/integration/presto/README.md
@@ -20,14 +20,10 @@
 Please follow the below steps to query carbondata in presto
 
 ### Config presto server
-* Download presto server 0.166 : https://repo1.maven.org/maven2/com/facebook/presto/presto-server/
-* Finish configuration as per https://prestodb.io/docs/current/installation/deployment.html
-  for example:
+* Download presto server (0.166 is suggested and supported) : https://repo1.maven.org/maven2/com/facebook/presto/presto-server/
+* Finish presto configuration following https://prestodb.io/docs/current/installation/deployment.html.
+  A configuration example:
   ```
-  carbondata.properties:
-  connector.name=carbondata
-  carbondata-store=/Users/apple/DEMO/presto_test/data
-  
   config.properties:
   coordinator=true
   node-scheduler.include-coordinator=true
@@ -57,30 +53,51 @@ Please follow the below steps to query carbondata in presto
   node.id=ffffffff-ffff-ffff-ffff-ffffffffffff
   node.data-dir=/Users/apple/DEMO/presto_test/data
   ```
-* config carbondata-connector for presto
+* Config carbondata-connector for presto
   
-  First:compile carbondata-presto integration module
+  Firstly: Compile carbondata, including carbondata-presto integration module
   ```
   $ git clone https://github.com/apache/carbondata
-  $ cd carbondata/integration/presto
-  $ mvn clean package
+  $ cd carbondata
+  $ mvn -DskipTests -P{spark-version} -Dspark.version={spark-version-number} -Dhadoop.version={hadoop-version-number} clean package
+  ```
+  Replace the spark and hadoop version with the version used in your cluster.
+  For example, if you are using Spark 2.1.0 and Hadoop 2.7.2, you would like to compile using:
+  ```
+  mvn -DskipTests -Pspark-2.1 -Dspark.version=2.1.0 -Dhadoop.version=2.7.2 clean package
+  ```
+
+  Secondly: Create a folder named 'carbondata' under $PRESTO_HOME$/plugin and
+  copy all jars from carbondata/integration/presto/target/carbondata-presto-x.x.x-SNAPSHOT
+        to $PRESTO_HOME$/plugin/carbondata
+
+  Thirdly: Create a carbondata.properties file under $PRESTO_HOME$/etc/catalog/ containing the following contents:
   ```
-  Second:create one folder "carbondata" under ./presto-server-0.166/plugin
-  Third:copy all jar from ./carbondata/integration/presto/target/carbondata-presto-x.x.x-SNAPSHOT
-        to ./presto-server-0.166/plugin/carbondata
+  connector.name=carbondata
+  carbondata-store={schema-store-path}
+  ```
+  Replace the schema-store-path with the absolute path of the parent directory of the schema.
+  For example, if you have a schema named 'default' stored in hdfs://namenode:9000/test/carbondata/,
+  Then set carbondata-store=hdfs://namenode:9000/test/carbondata
+
+  If you updated the jar balls or configuration files, make sure you have dispatched them
+   to all the presto nodes and restarted the presto servers on the nodes. The updates will not take effect before restarting.
   
 ### Generate CarbonData file
 
-Please refer to quick start : https://github.com/apache/carbondata/blob/master/docs/quick-start-guide.md
+Please refer to quick start: https://github.com/apache/carbondata/blob/master/docs/quick-start-guide.md.
+Load data statement in Spark can be used to create carbondata tables. And then you can easily find the created
+carbondata files.
 
 ### Query carbondata in CLI of presto
-* Download presto-cli-0.166-executable.jar
+* Download presto cli client following: https://prestodb.io/docs/current/installation/cli.html
 
 * Start CLI:
   
   ```
-  $ ./presto-cli-0.166-executable.jar --server localhost:8086 --catalog carbondata --schema default
+  $ ./presto --server localhost:8086 --catalog carbondata --schema default
   ```
+  Replace the hostname, port and schema name with your own.
 
 
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b699ee6f/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
index d97f19e..d557920 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
@@ -71,7 +71,7 @@ public class CarbondataConnectorFactory implements ConnectorFactory {
       ConnectorSplitManager splitManager = injector.getInstance(ConnectorSplitManager.class);
       ConnectorRecordSetProvider connectorRecordSet =
           injector.getInstance(ConnectorRecordSetProvider.class);
-       ConnectorPageSourceProvider connectorPageSource = injector.getInstance(ConnectorPageSourceProvider.class);
+      ConnectorPageSourceProvider connectorPageSource = injector.getInstance(ConnectorPageSourceProvider.class);
 
       return new CarbondataConnector(lifeCycleManager, metadata,
           new ClassLoaderSafeConnectorSplitManager(splitManager, classLoader), connectorRecordSet,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b699ee6f/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java
index ba8d9b5..f0a8428 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java
@@ -17,19 +17,22 @@
 
 package org.apache.carbondata.presto.impl;
 
-import java.util.List;
-
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
+import java.util.List;
+
+/**
+ * CarbonLocalInputSplit represents a block, it contains a set of blocklet.
+ */
 public class CarbonLocalInputSplit {
 
   private static final long serialVersionUID = 3520344046772190207L;
   private String segmentId;
   private String path;
-  private long start;
-  private long length;
-  private List<String> locations;
+  private long start; // the start offset of the block in a carbondata file.
+  private long length; // the length of the block.
+  private List<String> locations;// locations are the locations for different replicas.
   private short version;
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b699ee6f/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableCacheModel.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableCacheModel.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableCacheModel.java
index 45755d1..2a4db14 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableCacheModel.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableCacheModel.java
@@ -23,7 +23,7 @@ import org.apache.carbondata.core.metadata.schema.table.TableInfo;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 
 /**
- * Caching Carbon meta(e.g. TableIdentifier, TablePath, TableInfo, CarbonTable) in Class CarbonTableReader
+ * Caching metadata of CarbonData(e.g. TableIdentifier, TablePath, TableInfo, CarbonTable) in Class CarbonTableReader
  * to speed up query
  */
 public class CarbonTableCacheModel {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b699ee6f/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
index c328a64..54832f5 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
@@ -72,15 +72,31 @@ import static java.util.Objects.requireNonNull;
  * 2:FileFactory, (physic table file)
  * 3:CarbonCommonFactory, (offer some )
  * 4:DictionaryFactory, (parse dictionary util)
+ *
+ * Currently, it is mainly used to parse metadata of tables under
+ * the configured carbondata-store path and filter the relevant
+ * input splits with given query predicates.
  */
 public class CarbonTableReader {
 
   private CarbonTableConfig config;
+
+  /**
+   * The names of the tables under the schema (this.carbonFileList).
+   */
   private List<SchemaTableName> tableList;
+
+  /**
+   * carbonFileList represents the store path of the schema, which is configured as carbondata-store
+   * in the CarbonData catalog file ($PRESTO_HOME$/etc/catalog/carbondata.properties).
+   */
   private CarbonFile carbonFileList;
   private FileFactory.FileType fileType;
 
-  // A cache for Carbon reader
+  /**
+   * A cache for Carbon reader, with this cache,
+   * metadata of a table is only read from file system once.
+   */
   private ConcurrentHashMap<SchemaTableName, CarbonTableCacheModel> cc;
 
   @Inject public CarbonTableReader(CarbonTableConfig config) {
@@ -88,9 +104,14 @@ public class CarbonTableReader {
     this.cc = new ConcurrentHashMap<>();
   }
 
-  // for worker node to initialize carbon metastore
+  /**
+   * For presto worker node to initialize the metadata cache of a table.
+   * @param table the name of the table and schema.
+   * @return
+   */
   public CarbonTableCacheModel getCarbonCache(SchemaTableName table) {
     if (!cc.containsKey(table)) {
+      // if this table is not cached, try to read the metadata of the table and cache it.
       try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(
           FileFactory.class.getClassLoader())) {
         if (carbonFileList == null) {
@@ -110,17 +131,26 @@ public class CarbonTableReader {
     else return null;
   }
 
+  /**
+   * Return the schema names under a schema store path (this.carbonFileList).
+   * @return
+   */
   public List<String> getSchemaNames() {
     return updateSchemaList();
   }
 
-  // default PathFilter
+  // default PathFilter, accepts files in carbondata format (with .carbondata extension).
   private static final PathFilter DefaultFilter = new PathFilter() {
     @Override public boolean accept(Path path) {
       return CarbonTablePath.isCarbonDataFile(path.getName());
     }
   };
 
+  /**
+   * Get the CarbonFile instance which represents the store path in the configuration, and assign it to
+   * this.carbonFileList.
+   * @return
+   */
   public boolean updateCarbonFile() {
     if (carbonFileList == null) {
       fileType = FileFactory.getFileType(config.getStorePath());
@@ -133,6 +163,10 @@ public class CarbonTableReader {
     return true;
   }
 
+  /**
+   * Return the schema names under a schema store path (this.carbonFileList).
+   * @return
+   */
   public List<String> updateSchemaList() {
     updateCarbonFile();
 
@@ -143,13 +177,23 @@ public class CarbonTableReader {
     } else return ImmutableList.of();
   }
 
+  /**
+   * Get the names of the tables in the given schema.
+   * @param schema name of the schema
+   * @return
+   */
   public Set<String> getTableNames(String schema) {
     requireNonNull(schema, "schema is null");
     return updateTableList(schema);
   }
 
-  public Set<String> updateTableList(String dbName) {
-    List<CarbonFile> schema = Stream.of(carbonFileList.listFiles()).filter(a -> dbName.equals(a.getName()))
+  /**
+   * Get the names of the tables in the given schema.
+   * @param schemaName name of the schema
+   * @return
+   */
+  public Set<String> updateTableList(String schemaName) {
+    List<CarbonFile> schema = Stream.of(carbonFileList.listFiles()).filter(a -> schemaName.equals(a.getName()))
         .collect(Collectors.toList());
     if (schema.size() > 0) {
       return Stream.of((schema.get(0)).listFiles()).map(a -> a.getName())
@@ -157,6 +201,11 @@ public class CarbonTableReader {
     } else return ImmutableSet.of();
   }
 
+  /**
+   * Get the CarbonTable instance of the given table.
+   * @param schemaTableName name of the given table.
+   * @return
+   */
   public CarbonTable getTable(SchemaTableName schemaTableName) {
     try {
       updateSchemaTables();
@@ -170,6 +219,11 @@ public class CarbonTableReader {
     return table;
   }
 
+  /**
+   * Find all the tables under the schema store path (this.carbonFileList)
+   * and cache all the table names in this.tableList. Notice that whenever this method
+   * is called, it clears this.tableList and populate the list by reading the files.
+   */
   public void updateSchemaTables() {
     // update logic determine later
     if (carbonFileList == null) {
@@ -185,6 +239,12 @@ public class CarbonTableReader {
     }
   }
 
+  /**
+   * Find the table with the given name and build a CarbonTable instance for it.
+   * This method should be called after this.updateSchemaTables().
+   * @param schemaTableName name of the given table.
+   * @return
+   */
   private CarbonTable loadTableMetadata(SchemaTableName schemaTableName) {
     for (SchemaTableName table : tableList) {
       if (!table.equals(schemaTableName)) continue;
@@ -195,7 +255,9 @@ public class CarbonTableReader {
   }
 
   /**
-   * parse carbon metadata into cc(CarbonTableReader cache)
+   * Read the metadata of the given table and cache it in this.cc (CarbonTableReader cache).
+   * @param table name of the given table.
+   * @return the CarbonTable instance which contains all the needed metadata for a table.
    */
   public CarbonTable parseCarbonMetadata(SchemaTableName table) {
     CarbonTable result = null;
@@ -203,17 +265,25 @@ public class CarbonTableReader {
       CarbonTableCacheModel cache = cc.getOrDefault(table, new CarbonTableCacheModel());
       if (cache.isValid()) return cache.carbonTable;
 
-      //Step1: get table meta path, load carbon table param
+      // If table is not previously cached, then:
+
+      // Step 1: get store path of the table and cache it.
       String storePath = config.getStorePath();
+        // create table identifier. the table id is randomly generated.
       cache.carbonTableIdentifier =
           new CarbonTableIdentifier(table.getSchemaName(), table.getTableName(),
               UUID.randomUUID().toString());
+        // get the store path of the table.
       cache.carbonTablePath =
           PathFactory.getInstance().getCarbonTablePath(storePath, cache.carbonTableIdentifier);
+        // cache the table
       cc.put(table, cache);
 
-      //Step2: check file existed? read schema file
+      //Step 2: read the metadata (tableInfo) of the table.
       ThriftReader.TBaseCreator createTBase = new ThriftReader.TBaseCreator() {
+        // TBase is used to read and write thrift objects.
+        // TableInfo is a kind of TBase used to read and write table information.
+        // TableInfo is generated by thrift, see schema.thrift under format/src/main/thrift for details.
         public TBase create() {
           return new org.apache.carbondata.format.TableInfo();
         }
@@ -225,14 +295,16 @@ public class CarbonTableReader {
           (org.apache.carbondata.format.TableInfo) thriftReader.read();
       thriftReader.close();
 
-      // Step3: Transform Format Level TableInfo to Code Level TableInfo
+      // Step 3: convert format level TableInfo to code level TableInfo
       SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
+        // wrapperTableInfo is the code level information of a table in carbondata core, different from the Thrift TableInfo.
       TableInfo wrapperTableInfo = schemaConverter
           .fromExternalToWrapperTableInfo(tableInfo, table.getSchemaName(), table.getTableName(),
               storePath);
       wrapperTableInfo.setMetaDataFilepath(
           CarbonTablePath.getFolderContainingFile(cache.carbonTablePath.getSchemaFilePath()));
-      // Step4: Load metadata info into CarbonMetadata
+
+      // Step 4: Load metadata info into CarbonMetadata
       CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo);
 
       cache.tableInfo = wrapperTableInfo;
@@ -246,6 +318,13 @@ public class CarbonTableReader {
     return result;
   }
 
+  /**
+   * Apply filters to the table and get valid input splits of the table.
+   * @param tableCacheModel the table
+   * @param filters the filters
+   * @return
+   * @throws Exception
+   */
   public List<CarbonLocalInputSplit> getInputSplits2(CarbonTableCacheModel tableCacheModel,
       Expression filters) throws Exception {
 
@@ -332,7 +411,16 @@ public class CarbonTableReader {
   }
 
   /**
-   * get data blocks of given segment
+   * Get all the data blocks of a given segment.
+   * @param filterExpressionProcessor
+   * @param absoluteTableIdentifier
+   * @param tablePath
+   * @param resolver
+   * @param segmentId
+   * @param cacheClient
+   * @param updateStatusManager
+   * @return
+   * @throws IOException
    */
   private List<DataRefNode> getDataBlocksOfSegment(
       FilterExpressionProcessor filterExpressionProcessor,
@@ -380,6 +468,16 @@ public class CarbonTableReader {
     return false;
   }
 
+  /**
+   * Build and load the B-trees of the segment.
+   * @param absoluteTableIdentifier
+   * @param tablePath
+   * @param segmentId
+   * @param cacheClient
+   * @param updateStatusManager
+   * @return
+   * @throws IOException
+   */
   private Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> getSegmentAbstractIndexs(/*JobContext job,*/
       AbsoluteTableIdentifier absoluteTableIdentifier, CarbonTablePath tablePath, String segmentId,
       CacheClient cacheClient, SegmentUpdateStatusManager updateStatusManager) throws IOException {
@@ -491,6 +589,13 @@ public class CarbonTableReader {
     return false;
   }
 
+  /**
+   * Get the input splits of a set of carbondata files.
+   * @param fileStatusList the file statuses of the set of carbondata files.
+   * @param targetSystem hdfs FileSystem
+   * @return
+   * @throws IOException
+   */
   private List<InputSplit> getSplit(List<FileStatus> fileStatusList, FileSystem targetSystem)
       throws IOException {
 
@@ -501,6 +606,7 @@ public class CarbonTableReader {
     while (true) {
       while (true) {
         while (split.hasNext()) {
+          // file is a carbondata file
           FileStatus file = (FileStatus) split.next();
           Path path = file.getPath();
           long length = file.getLen();
@@ -520,7 +626,7 @@ public class CarbonTableReader {
               int blkIndex;
               for (
                   bytesRemaining = length;
-                  (double) bytesRemaining / (double) splitSize > 1.1D;
+                  (double) bytesRemaining / (double) splitSize > 1.1D;// when there are more than one splits left.
                   bytesRemaining -= splitSize) {
                 blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining);
                 splits.add(this.makeSplit(path, length - bytesRemaining, splitSize,
@@ -552,6 +658,15 @@ public class CarbonTableReader {
     return new String[] { "0" };
   }
 
+  /**
+   * Get all file statuses of the carbondata files with a segmentId in segmentsToConsider
+   * under the tablePath, and add them to the result.
+   * @param segmentsToConsider
+   * @param tablePath
+   * @param result
+   * @return the FileSystem instance been used in this function.
+   * @throws IOException
+   */
   private FileSystem getFileStatusOfSegments(String[] segmentsToConsider, CarbonTablePath tablePath,
       List<FileStatus> result) throws IOException {
     String[] partitionsToConsider = getValidPartitions();
@@ -584,6 +699,7 @@ public class CarbonTableReader {
             LocatedFileStatus stat = iter.next();
             if (DefaultFilter.accept(stat.getPath())) {
               if (stat.isDirectory()) {
+                // DefaultFiler accepts carbondata files.
                 addInputPathRecursively(result, fs, stat.getPath(), DefaultFilter);
               } else {
                 result.add(stat);
@@ -598,6 +714,15 @@ public class CarbonTableReader {
     return fs;
   }
 
+  /**
+   * Get the FileStatus of all carbondata files under the path recursively,
+   * and add the file statuses into the result
+   * @param result
+   * @param fs
+   * @param path
+   * @param inputFilter the filter used to determinate whether a path is a carbondata file
+   * @throws IOException
+   */
   protected void addInputPathRecursively(List<FileStatus> result, FileSystem fs, Path path,
       PathFilter inputFilter) throws IOException {
     RemoteIterator iter = fs.listLocatedStatus(path);
@@ -616,7 +741,10 @@ public class CarbonTableReader {
   }
 
   /**
-   * get data blocks of given btree
+   * Get the data blocks of a b tree. the root node of the b tree is abstractIndex.dataRefNode.
+   * BTreeNode is a sub class of DataRefNode.
+   * @param abstractIndex
+   * @return
    */
   private List<DataRefNode> getDataBlocksOfIndex(AbstractIndex abstractIndex) {
     List<DataRefNode> blocks = new LinkedList<DataRefNode>();