You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2019/01/30 10:50:48 UTC

[carbondata] branch branch-1.5 updated (213577d -> 72ede52)

This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a change to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/carbondata.git.


 discard 213577d  [DOC] Document Update for default sort scope
 discard 71957ad  [CARBONDATA-3280] Fix the issue of SDK assert can't work
 discard 873d76b  [HOTFIX] SDV framework for presto cluster test suite
 discard 00b18916 [CARBONDATA-3282] set hadoop conf to thread local for file factory usage in presto carbon
 discard 5e1313a  [HOTFIX] Upgraded jars to work S3 with presto
 discard 9373bc4  [CARBONDATA-3273] [CARBONDATA-3274] Fix for SORT_SCOPE in CarbonLoadDataCommand
 discard ee40cd4  [CARBONDATA-3272]fix ArrayIndexOutOfBoundsException of horizontal compaction during update, when cardinality changes within a segment
 discard 5d4b6f3  [HOTFIX] Fix select query on varchar column with large data fails with jvm crash
 discard 7e677a0  [CARBONDATA-3269] Fix ArrayIndexOutOfBoundsException of Range_Column when using KryoSerializer
 discard 0c072dc  [CARBONDATA-3268] Fix for query on Varchar Columns showing Null in Presto
 discard f2a045f  [CARBONDATA-3235] Fixed Alter Table Rename
 discard 296d34a  [CARBONDATA-3275][TEST] Fix errors in tests
 discard 2f106d9  [CARBONDATA-3264] Added SORT_SCOPE in ALTER TABLE SET
 discard d9d03e1  [CARBONDATA-3232] Add example and doc for alluxio integration
 discard 857288a  [CARBONDATA-3252] Remove unused import
 discard 39c50d4  [HOTFIX] presto carbon doesn't work with Hadoop conf in cluster.
 discard 7383da8  [CARBONDATA-3265] Fixed memory leak in Range Sort
 discard 1bdfed0  [CARBONDATA-3267]Fixed Range Sort OOM Issue
 discard 8b872eb  [CARBONDATA-3262] Fix merge index failure handling for compacted segment
 discard abcfad2  [CARBONDATA-3257] Fix for NO_SORT load and describe formatted being in NO_SORT flow even with Sort Columns given
 discard 6dc581a  [CARBONDATA-3263] Update doc for RANGE_COLUMN
 discard 4456a93  [DOC] Updated Presto guide
 discard 9235c76  [CARBONDATA-3246]Fix sdk reader issue if batch size is given as zero and vectorRead False.
 discard fdd8699  [DOC] Update range_info partition example in ddl description
 discard 8d8a35d  [CARBONDATA-3243] Updated DOC for No-Sort Compaction and a few Fixes
 discard 625f390  [CARBONDATA-3259] Added documentation for new complex delimiters
 discard 0d4a651  [CARBONDATA-3260] Fix the Hive stats issue in carbon catalog table
    omit a8235fa  [maven-release-plugin] prepare release apache-carbondata-1.5.2-rc1
     new 01c1ef6  [CARBONDATA-3260] Fix the Hive stats issue in carbon catalog table
     new 42e30b5  [CARBONDATA-3259] Added documentation for new complex delimiters
     new da235f2  [CARBONDATA-3243] Updated DOC for No-Sort Compaction and a few Fixes
     new 7b66f49  [DOC] Update range_info partition example in ddl description
     new 6c466cd  [CARBONDATA-3246]Fix sdk reader issue if batch size is given as zero and vectorRead False.
     new e0146e6  [DOC] Updated Presto guide
     new 045a614  [CARBONDATA-3263] Update doc for RANGE_COLUMN
     new a58fc00  [CARBONDATA-3257] Fix for NO_SORT load and describe formatted being in NO_SORT flow even with Sort Columns given
     new 68eb400  [CARBONDATA-3262] Fix merge index failure handling for compacted segment
     new 99c2e50  [CARBONDATA-3267]Fixed Range Sort OOM Issue
     new 59ad8fe  [CARBONDATA-3265] Fixed memory leak in Range Sort
     new 1aadaab  [HOTFIX] presto carbon doesn't work with Hadoop conf in cluster.
     new 9c8cd1a  [CARBONDATA-3252] Remove unused import
     new d288c4f  [CARBONDATA-3232] Add example and doc for alluxio integration
     new a4086f7  [CARBONDATA-3264] Added SORT_SCOPE in ALTER TABLE SET
     new ab2c161  [CARBONDATA-3275][TEST] Fix errors in tests
     new d703e3e  [CARBONDATA-3235] Fixed Alter Table Rename
     new 6b09bf7  [CARBONDATA-3268] Fix for query on Varchar Columns showing Null in Presto
     new f8bd229  [CARBONDATA-3269] Fix ArrayIndexOutOfBoundsException of Range_Column when using KryoSerializer
     new df92df1  [HOTFIX] Fix select query on varchar column with large data fails with jvm crash
     new b71731c  [CARBONDATA-3272]fix ArrayIndexOutOfBoundsException of horizontal compaction during update, when cardinality changes within a segment
     new 4a08439  [CARBONDATA-3273] [CARBONDATA-3274] Fix for SORT_SCOPE in CarbonLoadDataCommand
     new 5b4cd18  [HOTFIX] Upgraded jars to work S3 with presto
     new cd9d7b3  [CARBONDATA-3282] set hadoop conf to thread local for file factory usage in presto carbon
     new 64ae35a  [HOTFIX] SDV framework for presto cluster test suite
     new 2500a90  [CARBONDATA-3280] Fix the issue of SDK assert can't work
     new 72ede52  [DOC] Document Update for default sort scope

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (213577d)
            \
             N -- N -- N   refs/heads/branch-1.5 (72ede52)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 27 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 assembly/pom.xml                      | 2 +-
 common/pom.xml                        | 2 +-
 core/pom.xml                          | 2 +-
 datamap/bloom/pom.xml                 | 2 +-
 datamap/examples/pom.xml              | 2 +-
 datamap/lucene/pom.xml                | 2 +-
 datamap/mv/core/pom.xml               | 2 +-
 datamap/mv/plan/pom.xml               | 2 +-
 examples/spark2/pom.xml               | 2 +-
 format/pom.xml                        | 2 +-
 hadoop/pom.xml                        | 2 +-
 integration/hive/pom.xml              | 2 +-
 integration/presto/pom.xml            | 2 +-
 integration/spark-common-test/pom.xml | 2 +-
 integration/spark-common/pom.xml      | 2 +-
 integration/spark-datasource/pom.xml  | 2 +-
 integration/spark2/pom.xml            | 2 +-
 pom.xml                               | 4 ++--
 processing/pom.xml                    | 2 +-
 store/sdk/pom.xml                     | 2 +-
 streaming/pom.xml                     | 2 +-
 tools/cli/pom.xml                     | 2 +-
 22 files changed, 23 insertions(+), 23 deletions(-)


[carbondata] 16/27: [CARBONDATA-3275][TEST] Fix errors in tests

Posted by ra...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/carbondata.git

commit ab2c161f0cfd2e7272cb362237265b493456a0b0
Author: xubo245 <xu...@huawei.com>
AuthorDate: Sat Jan 26 21:21:22 2019 +0800

    [CARBONDATA-3275][TEST] Fix errors in tests
    
    Fix 4 errors in CI after PR 3094 merged
    
    This closes #3105
---
 .../sql/execution/command/table/CarbonDescribeFormattedCommand.scala    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
index e541139..1a1473b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
@@ -61,7 +61,7 @@ private[sql] case class CarbonDescribeFormattedCommand(
       "NO_SORT"
     } else {
       if (tblProps.contains(CarbonCommonConstants.SORT_SCOPE)) {
-        tblProps.get(CarbonCommonConstants.SORT_SCOPE).toString
+        tblProps.get(CarbonCommonConstants.SORT_SCOPE).get
       } else {
         tblProps
           .getOrElse(CarbonCommonConstants.SORT_SCOPE,


[carbondata] 15/27: [CARBONDATA-3264] Added SORT_SCOPE in ALTER TABLE SET

Posted by ra...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/carbondata.git

commit a4086f7cb934a3dbf7ef1e8fee98ee07a4fb302e
Author: namanrastogi <na...@gmail.com>
AuthorDate: Tue Jan 22 11:42:40 2019 +0530

    [CARBONDATA-3264] Added SORT_SCOPE in ALTER TABLE SET
    
    Added SORT_SCOPE in ALTER TABLE SET Command.
    This command changes the SORT_SCOPE of table after table has been created.
    
    Usage:
    
    ALTER TABLE <table> SET TBLPROPERTIES('sort_scope'='no_sort')
    Restrictions:
    
    Cannot change SORT_SCOPE from NO_SORT to anything else when SORT_COLUMNS is empty.
    
    This closes #3094
---
 docs/ddl-of-carbondata.md                          | 58 +++++++++++-------
 .../org/apache/spark/util/AlterTableUtil.scala     | 33 +++++++++--
 .../restructure/AlterTableValidationTestCase.scala | 69 ++++++++++++++++++++++
 3 files changed, 134 insertions(+), 26 deletions(-)

diff --git a/docs/ddl-of-carbondata.md b/docs/ddl-of-carbondata.md
index 4f9e47b..0d0e5bd 100644
--- a/docs/ddl-of-carbondata.md
+++ b/docs/ddl-of-carbondata.md
@@ -51,7 +51,7 @@ CarbonData DDL statements are documented here,which includes:
     * [RENAME COLUMN](#change-column-nametype)
     * [CHANGE COLUMN NAME/TYPE](#change-column-nametype)
     * [MERGE INDEXES](#merge-index)
-    * [SET/UNSET Local Dictionary Properties](#set-and-unset-for-local-dictionary-properties)
+    * [SET/UNSET](#set-and-unset)
   * [DROP TABLE](#drop-table)
   * [REFRESH TABLE](#refresh-table)
   * [COMMENTS](#table-and-column-comment)
@@ -634,7 +634,7 @@ CarbonData DDL statements are documented here,which includes:
 
   The following section introduce the commands to modify the physical or logical state of the existing table(s).
 
-   - ##### RENAME TABLE
+   - #### RENAME TABLE
    
      This command is used to rename the existing table.
      ```
@@ -648,7 +648,7 @@ CarbonData DDL statements are documented here,which includes:
      ALTER TABLE test_db.carbon RENAME TO test_db.carbonTable
      ```
 
-   - ##### ADD COLUMNS
+   - #### ADD COLUMNS
    
      This command is used to add a new column to the existing table.
      ```
@@ -676,7 +676,7 @@ Users can specify which columns to include and exclude for local dictionary gene
      ALTER TABLE carbon ADD COLUMNS (a1 STRING, b1 STRING) TBLPROPERTIES('LOCAL_DICTIONARY_INCLUDE'='a1','LOCAL_DICTIONARY_EXCLUDE'='b1')
      ```
 
-   - ##### DROP COLUMNS
+   - #### DROP COLUMNS
    
      This command is used to delete the existing column(s) in a table.
 
@@ -696,7 +696,7 @@ Users can specify which columns to include and exclude for local dictionary gene
 
      **NOTE:** Drop Complex child column is not supported.
 
-   - ##### CHANGE COLUMN NAME/TYPE
+   - #### CHANGE COLUMN NAME/TYPE
    
      This command is used to change column name and the data type from INT to BIGINT or decimal precision from lower to higher.
      Change of decimal data type from lower precision to higher precision will only be supported for cases where there is no data loss.
@@ -729,7 +729,8 @@ Users can specify which columns to include and exclude for local dictionary gene
      ```
 
      **NOTE:** Once the column is renamed, user has to take care about replacing the fileheader with the new name or changing the column header in csv file.
-- ##### MERGE INDEX
+   
+   - #### MERGE INDEX
 
      This command is used to merge all the CarbonData index files (.carbonindex) inside a segment to a single CarbonData index merge file (.carbonindexmerge). This enhances the first query performance.
 
@@ -747,23 +748,36 @@ Users can specify which columns to include and exclude for local dictionary gene
 
      * Merge index is not supported on streaming table.
 
-- ##### SET and UNSET for Local Dictionary Properties
-
-   When set command is used, all the newly set properties will override the corresponding old properties if exists.
-  
-   Example to SET Local Dictionary Properties:
-   ```
-   ALTER TABLE tablename SET TBLPROPERTIES('LOCAL_DICTIONARY_ENABLE'='false','LOCAL_DICTIONARY_THRESHOLD'='1000','LOCAL_DICTIONARY_INCLUDE'='column1','LOCAL_DICTIONARY_EXCLUDE'='column2')
-   ```
-   When Local Dictionary properties are unset, corresponding default values will be used for these properties.
+   - #### SET and UNSET
    
-   Example to UNSET Local Dictionary Properties:
-   ```
-   ALTER TABLE tablename UNSET TBLPROPERTIES('LOCAL_DICTIONARY_ENABLE','LOCAL_DICTIONARY_THRESHOLD','LOCAL_DICTIONARY_INCLUDE','LOCAL_DICTIONARY_EXCLUDE')
-   ```
-   
-   **NOTE:** For old tables, by default, local dictionary is disabled. If user wants local dictionary for these tables, user can enable/disable local dictionary for new data at their discretion. 
-   This can be achieved by using the alter table set command.
+     When set command is used, all the newly set properties will override the corresponding old properties if exists.
+  
+     - ##### Local Dictionary Properties
+       Example to SET Local Dictionary Properties:
+       ```
+       ALTER TABLE tablename SET TBLPROPERTIES('LOCAL_DICTIONARY_ENABLE'='false','LOCAL_DICTIONARY_THRESHOLD'='1000','LOCAL_DICTIONARY_INCLUDE'='column1','LOCAL_DICTIONARY_EXCLUDE'='column2')
+       ```
+       When Local Dictionary properties are unset, corresponding default values will be used for these properties.
+    
+       Example to UNSET Local Dictionary Properties:
+       ```
+       ALTER TABLE tablename UNSET TBLPROPERTIES('LOCAL_DICTIONARY_ENABLE','LOCAL_DICTIONARY_THRESHOLD','LOCAL_DICTIONARY_INCLUDE','LOCAL_DICTIONARY_EXCLUDE')
+       ```
+    
+       **NOTE:** For old tables, by default, local dictionary is disabled. If user wants local dictionary for these tables, user can enable/disable local dictionary for new data at their discretion.
+       This can be achieved by using the alter table set command.
+  
+     - ##### SORT SCOPE
+       Example to SET SORT SCOPE:
+       ```
+       ALTER TABLE tablename SET TBLPROPERTIES('SORT_SCOPE'='NO_SORT')
+       ```
+       When Sort Scope is unset, the default values (NO_SORT) will be used.
+    
+       Example to UNSET SORT SCOPE:
+       ```
+       ALTER TABLE tablename UNSET TBLPROPERTIES('SORT_SCOPE')
+       ```
 
 ### DROP TABLE
 
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index 045d2d5..1dc562dc 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -35,6 +35,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.DataMapStoreManager
 import org.apache.carbondata.core.datastore.block.SegmentPropertiesAndSchemaHolder
 import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.exception.InvalidConfigurationException
 import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
@@ -360,6 +361,9 @@ object AlterTableUtil {
       // validate the range column properties
       validateRangeColumnProperties(carbonTable, lowerCasePropertiesMap)
 
+      // validate the Sort Scope
+      validateSortScopeProperty(carbonTable, lowerCasePropertiesMap)
+
       // below map will be used for cache invalidation. As tblProperties map is getting modified
       // in the next few steps the original map need to be retained for any decision making
       val existingTablePropertiesMap = mutable.Map(tblPropertiesMap.toSeq: _*)
@@ -387,11 +391,14 @@ object AlterTableUtil {
             // older tables. So no need to remove from table properties map for unset just to ensure
             // for older table behavior. So in case of unset, if enable property is already present
             // in map, then just set it to default value of local dictionary which is true.
-            if (!propKey.equalsIgnoreCase(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE)) {
-              tblPropertiesMap.remove(propKey.toLowerCase)
-            } else {
+            if (propKey.equalsIgnoreCase(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE)) {
               tblPropertiesMap
                 .put(propKey.toLowerCase, CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE_DEFAULT)
+            } else if (propKey.equalsIgnoreCase("sort_scope")) {
+              tblPropertiesMap
+                .put(propKey.toLowerCase, CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)
+            } else {
+              tblPropertiesMap.remove(propKey.toLowerCase)
             }
           } else {
             val errorMessage = "Error: Invalid option(s): " + propKey
@@ -432,7 +439,8 @@ object AlterTableUtil {
       "LOCAL_DICTIONARY_INCLUDE",
       "LOCAL_DICTIONARY_EXCLUDE",
       "LOAD_MIN_SIZE_INMB",
-      "RANGE_COLUMN")
+      "RANGE_COLUMN",
+      "SORT_SCOPE")
     supportedOptions.contains(propKey.toUpperCase)
   }
 
@@ -534,6 +542,23 @@ object AlterTableUtil {
     }
   }
 
+  def validateSortScopeProperty(carbonTable: CarbonTable,
+      propertiesMap: mutable.Map[String, String]): Unit = {
+    propertiesMap.foreach { property =>
+      if (property._1.equalsIgnoreCase("SORT_SCOPE")) {
+        if (!CarbonUtil.isValidSortOption(property._2)) {
+          throw new MalformedCarbonCommandException(
+            s"Invalid SORT_SCOPE ${ property._2 }, valid SORT_SCOPE are 'NO_SORT', 'BATCH_SORT', " +
+            s"'LOCAL_SORT' and 'GLOBAL_SORT'")
+        } else if (!property._2.equalsIgnoreCase("NO_SORT") &&
+                   (carbonTable.getNumberOfSortColumns == 0)) {
+          throw new InvalidConfigurationException(
+            s"Cannot set SORT_SCOPE as ${ property._2 } when table has no SORT_COLUMNS")
+        }
+      }
+    }
+  }
+
   /**
    * This method will validate if there is any complex type column in the columns to be cached
    *
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala
index b77fdc8..10afa87 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala
@@ -775,6 +775,75 @@ test("test alter command for boolean data type with correct default measure valu
       .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, timestampFormat)
   }
 
+
+  test("Alter Table Change Sort Scope 1") {
+    sql("DROP TABLE IF EXISTS t1")
+    sql(s"CREATE TABLE t1(age int, name string) STORED BY 'carbondata' TBLPROPERTIES" +
+        s"('sort_columns'='age', 'sort_scope'='local_sort')")
+    sql("ALTER TABLE t1 SET TBLPROPERTIES('sort_scope'='batch_sort')")
+    assert(sortScopeInDescFormatted("t1").equalsIgnoreCase("BATCH_SORT"))
+    sql("DROP TABLE t1")
+  }
+
+  test("Alter Table Change Sort Scope 2") {
+    sql("DROP TABLE IF EXISTS t1")
+    sql(s"CREATE TABLE t1(age int, name string) STORED BY 'carbondata' TBLPROPERTIES" +
+        s"('sort_columns'='age', 'sort_scope'='local_sort')")
+    sql("ALTER TABLE t1 SET TBLPROPERTIES('sort_scope'='no_sort')")
+    assert(sortScopeInDescFormatted("t1").equalsIgnoreCase("NO_SORT"))
+    sql("DROP TABLE t1")
+  }
+
+  test("Alter Table Change Sort Scope 3") {
+    sql("DROP TABLE IF EXISTS t1")
+    sql(s"CREATE TABLE t1(age int, name string) STORED BY 'carbondata' TBLPROPERTIES" +
+        s"('sort_columns'='')")
+
+    // This throws exception as SORT_COLUMNS is empty
+    intercept[RuntimeException] {
+      sql("ALTER TABLE t1 SET TBLPROPERTIES('sort_scope'='local_sort')")
+    }
+
+    // Even if we change the SORT_SCOPE to LOCAL_SORT
+    // the SORT_SCOPE should remain to NO_SORT
+    // because SORT_COLUMNS does not contain anything.
+    assert(sortScopeInDescFormatted("t1").equalsIgnoreCase("NO_SORT"))
+    sql("DROP TABLE t1")
+  }
+
+  test("Alter Table Change Sort Scope 4") {
+    sql("DROP TABLE IF EXISTS t1")
+    sql(s"CREATE TABLE t1(age int, name string) STORED BY 'carbondata' TBLPROPERTIES" +
+        s"('sort_columns'='age', 'sort_scope'='local_sort')")
+    sql("ALTER TABLE t1 UNSET TBLPROPERTIES('sort_scope')")
+
+    // Unsetting the SORT_SCOPE should change the SORT_SCOPE to
+    // CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT
+    assert(sortScopeInDescFormatted("t1")
+      .equalsIgnoreCase(CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))
+    sql("DROP TABLE t1")
+  }
+
+  test("Alter Table Change Sort Scope 5") {
+    sql("DROP TABLE IF EXISTS t1")
+    sql(s"CREATE TABLE t1(age int, name string) STORED BY 'carbondata' TBLPROPERTIES" +
+        s"('sort_scope'='local_sort', 'sort_columns'='age')")
+    intercept[RuntimeException] {
+      sql("ALTER TABLE t1 SET TBLPROPERTIES('sort_scope'='fake_sort')")
+    }
+
+    // SORT_SCOPE should remain unchanged
+    assert(sortScopeInDescFormatted("t1").equalsIgnoreCase("LOCAL_SORT"))
+    sql("DROP TABLE t1")
+  }
+
+  def sortScopeInDescFormatted(tableName: String): String = {
+    sql(s"DESCRIBE FORMATTED $tableName").filter(
+      (x: Row) => x.getString(0).equalsIgnoreCase("sort scope")
+    ).collectAsList().get(0).get(1).toString
+  }
+
+
   override def afterAll {
     sql("DROP TABLE IF EXISTS restructure")
     sql("drop table if exists table1")


[carbondata] 22/27: [CARBONDATA-3273] [CARBONDATA-3274] Fix for SORT_SCOPE in CarbonLoadDataCommand

Posted by ra...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/carbondata.git

commit 4a08439a893ddd271b590582b3da979f2be6dfc9
Author: namanrastogi <na...@gmail.com>
AuthorDate: Fri Jan 25 15:19:58 2019 +0530

    [CARBONDATA-3273] [CARBONDATA-3274] Fix for SORT_SCOPE in CarbonLoadDataCommand
    
    Problem1: With no SORT_COLUMNS, loading data was taking SORT_SCOPE=LOCAL_SORT instead of NO_SORT.
    Solution: Added a check for SORT_COLUMNS in CarbonLoadDataCommand
    
    Problem2: On table with some SORT_COLUMNS and SORT_SCOPE not specified, SORT_SCOPE was not considering CARBON.OPTIONS.SORT.SCOPE for SORT_SCOPE.
    Solution: Added checking of CARBON.OPTIONS.SORT.SCOPE while loading.
    
    This closes #3103
---
 docs/configuration-parameters.md                   |  2 +-
 docs/dml-of-carbondata.md                          | 18 ++++++++++----
 .../command/management/CarbonLoadDataCommand.scala | 28 +++++++++++-----------
 3 files changed, 28 insertions(+), 20 deletions(-)

diff --git a/docs/configuration-parameters.md b/docs/configuration-parameters.md
index d28ad61..9f13e97 100644
--- a/docs/configuration-parameters.md
+++ b/docs/configuration-parameters.md
@@ -208,7 +208,7 @@ RESET
 | carbon.options.date.format                 | Specifies the data format of the date columns in the data being loaded |
 | carbon.options.timestamp.format            | Specifies the timestamp format of the time stamp columns in the data being loaded |
 | carbon.options.sort.scope                 | Specifies how the current data load should be sorted with. **NOTE:** Refer to [Data Loading Configuration](#data-loading-configuration)#carbon.sort.scope for detailed information. |
-| carbon.table.load.sort.scope              | Overrides the SORT_SCOPE provided in CREATE TABLE.           |
+| carbon.table.load.sort.scope.<db>.<table> | Overrides the SORT_SCOPE provided in CREATE TABLE.           |
 | carbon.options.global.sort.partitions     |                                                              |
 | carbon.options.serialization.null.format  | Default Null value representation in the data being loaded. **NOTE:** Refer to [Data Loading Configuration](#data-loading-configuration)#carbon.options.serialization.null.format for detailed information. |
 | carbon.query.directQueryOnDataMap.enabled | Specifies whether datamap can be queried directly. This is useful for debugging purposes.**NOTE: **Refer to [Query Configuration](#query-configuration) for detailed information. |
diff --git a/docs/dml-of-carbondata.md b/docs/dml-of-carbondata.md
index b3fe517..ec2c053 100644
--- a/docs/dml-of-carbondata.md
+++ b/docs/dml-of-carbondata.md
@@ -109,11 +109,19 @@ CarbonData DML statements are documented here,which includes:
     ```
 
   - ##### SORT_SCOPE:
-  Sort Scope to be used for the current load. This overrides the Sort Scope of Table.
-
-  ```
-  OPTIONS('SORT_SCOPE'='BATCH_SORT')
-  ```
+    Sort Scope to be used for the current load. This overrides the Sort Scope of Table.
+    Requirement: Sort Columns must be set while creating table. If Sort Columns is null, Sort Scope is always NO_SORT.
+  
+    ```
+    OPTIONS('SORT_SCOPE'='BATCH_SORT')
+    ```
+    
+    Priority order for choosing Sort Scope is:
+    1. Load Data Command
+    2. CARBON.TABLE.LOAD.SORT.SCOPE.<db>.<table> session property
+    3. Table level Sort Scope
+    4. CARBON.OPTIONS.SORT.SCOPE session property
+    5. Default Value: NO_SORT
 
   - ##### MULTILINE:
 
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 7e3ea90..307e62d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -200,26 +200,26 @@ case class CarbonLoadDataCommand(
     *     LOAD DATA INPATH 'data.csv' INTO TABLE tableName OPTIONS('sort_scope'='no_sort')
     *
     * 2. Session property CARBON_TABLE_LOAD_SORT_SCOPE  ->
-    *     SET CARBON.TABLE.LOAD.SORT.SCOPE.database.table=no_sort
-    *     SET CARBON.TABLE.LOAD.SORT.SCOPE.database.table=batch_sort
     *     SET CARBON.TABLE.LOAD.SORT.SCOPE.database.table=local_sort
-    *     SET CARBON.TABLE.LOAD.SORT.SCOPE.database.table=global_sort
     *
     * 3. Sort Scope provided in TBLPROPERTIES
     * 4. Session property CARBON_OPTIONS_SORT_SCOPE
     * 5. Default Sort Scope LOAD_SORT_SCOPE
     */
-    if (tableProperties.get(CarbonCommonConstants.SORT_COLUMNS) != null &&
-        tableProperties.get(CarbonCommonConstants.SORT_SCOPE) == null) {
-      // If there are Sort Columns given for the table and Sort Scope is not specified,
-      // we will take it as whichever sort scope given or LOCAL_SORT as default
-      optionsFinal
-        .put(CarbonCommonConstants.SORT_SCOPE,
-          carbonProperty
-            .getProperty(
-              CarbonLoadOptionConstants.CARBON_TABLE_LOAD_SORT_SCOPE + table.getDatabaseName + "." +
-              table.getTableName, carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
-                SortScopeOptions.getSortScope("LOCAL_SORT").toString)))
+    if (StringUtils.isBlank(tableProperties.get(CarbonCommonConstants.SORT_COLUMNS))) {
+      // If tableProperties.SORT_COLUMNS is null
+      optionsFinal.put(CarbonCommonConstants.SORT_SCOPE,
+        SortScopeOptions.SortScope.NO_SORT.name)
+    } else if (StringUtils.isBlank(tableProperties.get(CarbonCommonConstants.SORT_SCOPE))) {
+      // If tableProperties.SORT_COLUMNS is not null
+      // and tableProperties.SORT_SCOPE is null
+      optionsFinal.put(CarbonCommonConstants.SORT_SCOPE,
+        options.getOrElse(CarbonCommonConstants.SORT_SCOPE,
+          carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_TABLE_LOAD_SORT_SCOPE +
+            table.getDatabaseName + "." + table.getTableName,
+            carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
+              carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
+                SortScopeOptions.SortScope.LOCAL_SORT.name)))))
     } else {
       optionsFinal.put(CarbonCommonConstants.SORT_SCOPE,
         options.getOrElse(CarbonCommonConstants.SORT_SCOPE,


[carbondata] 09/27: [CARBONDATA-3262] Fix merge index failure handling for compacted segment

Posted by ra...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/carbondata.git

commit 68eb400d2cadac89577672a9f047d4476563bd40
Author: kunal642 <ku...@gmail.com>
AuthorDate: Mon Jan 21 20:45:24 2019 +0530

    [CARBONDATA-3262] Fix merge index failure handling for compacted segment
    
    Problem: When merge index file writing fails, the load details for the segments being merged is wrongly written with the merged segment file name.
    Due to this when the next load happens the segment file for merged segment is deleted.
    Solution: Dont throw exception when merge index fails.
    
    This closes #3090
---
 .../core/writer/CarbonIndexFileMergeWriter.java    | 47 +++++++++++++---------
 .../spark/rdd/CarbonDataRDDFactory.scala           |  9 +++--
 .../command/management/CarbonLoadDataCommand.scala |  9 +++--
 .../processing/loading/model/CarbonLoadModel.java  |  6 +++
 4 files changed, 45 insertions(+), 26 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
index 9dde6b7..80e0af5 100644
--- a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
+++ b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
@@ -25,6 +25,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
@@ -39,6 +40,7 @@ import org.apache.carbondata.format.MergedBlockIndex;
 import org.apache.carbondata.format.MergedBlockIndexHeader;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
 
 public class CarbonIndexFileMergeWriter {
 
@@ -52,6 +54,8 @@ public class CarbonIndexFileMergeWriter {
    */
   private ThriftWriter thriftWriter;
 
+  private Logger LOGGER = LogServiceFactory.getLogService(this.getClass().getCanonicalName());
+
   public CarbonIndexFileMergeWriter(CarbonTable table) {
     this.table = table;
   }
@@ -68,27 +72,32 @@ public class CarbonIndexFileMergeWriter {
    */
   private String mergeCarbonIndexFilesOfSegment(String segmentId,
       String tablePath, List<String> indexFileNamesTobeAdded,
-      boolean readFileFooterFromCarbonDataFile, String uuid) throws IOException {
-    Segment segment = Segment.getSegment(segmentId, tablePath);
-    String segmentPath = CarbonTablePath.getSegmentPath(tablePath, segmentId);
-    CarbonFile[] indexFiles;
-    SegmentFileStore sfs = null;
-    if (segment != null && segment.getSegmentFileName() != null) {
-      sfs = new SegmentFileStore(tablePath, segment.getSegmentFileName());
-      List<CarbonFile> indexCarbonFiles = sfs.getIndexCarbonFiles();
-      indexFiles = indexCarbonFiles.toArray(new CarbonFile[indexCarbonFiles.size()]);
-    } else {
-      indexFiles =
-          SegmentIndexFileStore.getCarbonIndexFiles(segmentPath, FileFactory.getConfiguration());
-    }
-    if (isCarbonIndexFilePresent(indexFiles) || indexFileNamesTobeAdded != null) {
-      if (sfs == null) {
-        return writeMergeIndexFileBasedOnSegmentFolder(indexFileNamesTobeAdded,
-            readFileFooterFromCarbonDataFile, segmentPath, indexFiles, segmentId);
+      boolean readFileFooterFromCarbonDataFile, String uuid) {
+    try {
+      Segment segment = Segment.getSegment(segmentId, tablePath);
+      String segmentPath = CarbonTablePath.getSegmentPath(tablePath, segmentId);
+      CarbonFile[] indexFiles;
+      SegmentFileStore sfs = null;
+      if (segment != null && segment.getSegmentFileName() != null) {
+        sfs = new SegmentFileStore(tablePath, segment.getSegmentFileName());
+        List<CarbonFile> indexCarbonFiles = sfs.getIndexCarbonFiles();
+        indexFiles = indexCarbonFiles.toArray(new CarbonFile[indexCarbonFiles.size()]);
       } else {
-        return writeMergeIndexFileBasedOnSegmentFile(
-            segmentId, indexFileNamesTobeAdded, sfs, indexFiles, uuid);
+        indexFiles =
+            SegmentIndexFileStore.getCarbonIndexFiles(segmentPath, FileFactory.getConfiguration());
+      }
+      if (isCarbonIndexFilePresent(indexFiles) || indexFileNamesTobeAdded != null) {
+        if (sfs == null) {
+          return writeMergeIndexFileBasedOnSegmentFolder(indexFileNamesTobeAdded,
+              readFileFooterFromCarbonDataFile, segmentPath, indexFiles, segmentId);
+        } else {
+          return writeMergeIndexFileBasedOnSegmentFile(segmentId, indexFileNamesTobeAdded, sfs,
+              indexFiles, uuid);
+        }
       }
+    } catch (Exception e) {
+      LOGGER.error(
+          "Failed to merge index files in path: " + tablePath, e);
     }
     return null;
   }
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 02acde6..8268379 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -579,7 +579,8 @@ object CarbonDataRDDFactory {
         }
         val compactedSegments = new util.ArrayList[String]()
         handleSegmentMerging(sqlContext,
-          carbonLoadModel,
+          carbonLoadModel
+            .getCopyWithPartition(carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter),
           carbonTable,
           compactedSegments,
           operationContext)
@@ -587,8 +588,10 @@ object CarbonDataRDDFactory {
         writtenSegment
       } catch {
         case e: Exception =>
-          throw new Exception(
-            "Dataload is success. Auto-Compaction has failed. Please check logs.")
+          LOGGER.error(
+            "Auto-Compaction has failed. Ignoring this exception because the" +
+            " load is passed.", e)
+          writtenSegment
       }
     }
   }
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 242a467..7e3ea90 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -874,16 +874,17 @@ case class CarbonLoadDataCommand(
       // Trigger auto compaction
       CarbonDataRDDFactory.handleSegmentMerging(
         sparkSession.sqlContext,
-        carbonLoadModel,
+        carbonLoadModel
+          .getCopyWithPartition(carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter),
         table,
         compactedSegments,
         operationContext)
       carbonLoadModel.setMergedSegmentIds(compactedSegments)
     } catch {
       case e: Exception =>
-        throw new Exception(
-          "Dataload is success. Auto-Compaction has failed. Please check logs.",
-          e)
+        LOGGER.error(
+          "Auto-Compaction has failed. Ignoring this exception because the " +
+          "load is passed.", e)
     }
     val specs =
       SegmentFileStore.getPartitionSpecs(carbonLoadModel.getSegmentId, carbonLoadModel.getTablePath)
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
index ccf6eb2..1ce8aae 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
@@ -488,6 +488,9 @@ public class CarbonLoadModel implements Serializable {
     copy.parentTablePath = parentTablePath;
     copy.sdkWriterCores = sdkWriterCores;
     copy.columnCompressor = columnCompressor;
+    copy.rangePartitionColumn = rangePartitionColumn;
+    copy.scaleFactor = scaleFactor;
+    copy.totalSize = totalSize;
     return copy;
   }
 
@@ -544,6 +547,9 @@ public class CarbonLoadModel implements Serializable {
     copyObj.parentTablePath = parentTablePath;
     copyObj.sdkWriterCores = sdkWriterCores;
     copyObj.columnCompressor = columnCompressor;
+    copyObj.rangePartitionColumn = rangePartitionColumn;
+    copyObj.scaleFactor = scaleFactor;
+    copyObj.totalSize = totalSize;
     return copyObj;
   }
 


[carbondata] 13/27: [CARBONDATA-3252] Remove unused import

Posted by ra...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/carbondata.git

commit 9c8cd1a5887a2e583c24b7d2dd3ae63186d236a1
Author: gouyangyangy <go...@163.com>
AuthorDate: Mon Jan 21 14:51:43 2019 +0800

    [CARBONDATA-3252] Remove unused import
    
    Delete unused packages
    
    This closes #3087
---
 .../spark/testsuite/badrecordloger/BadRecordLoggerTest.scala          | 4 +---
 .../directdictionary/TimestampNoDictionaryColumnTestCase.scala        | 2 --
 2 files changed, 1 insertion(+), 5 deletions(-)

diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordLoggerTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordLoggerTest.scala
index 5c28cbd..5d4ceaf 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordLoggerTest.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordLoggerTest.scala
@@ -21,10 +21,9 @@ import java.io.{File, FileFilter}
 
 import org.apache.commons.io.FileUtils
 import org.apache.spark.sql.Row
-import org.apache.spark.sql.hive.HiveContext
 import org.scalatest.BeforeAndAfterAll
 
-import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
+import org.apache.carbondata.core.constants.{CarbonCommonConstants}
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.spark.sql.test.util.QueryTest
 
@@ -36,7 +35,6 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
  *
  */
 class BadRecordLoggerTest extends QueryTest with BeforeAndAfterAll {
-  var hiveContext: HiveContext = _
 
   override def beforeAll {
     try {
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/directdictionary/TimestampNoDictionaryColumnTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/directdictionary/TimestampNoDictionaryColumnTestCase.scala
index e8a465a..f46b467 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/directdictionary/TimestampNoDictionaryColumnTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/directdictionary/TimestampNoDictionaryColumnTestCase.scala
@@ -20,10 +20,8 @@ package org.apache.carbondata.spark.testsuite.directdictionary
 import java.sql.Timestamp
 
 import org.apache.spark.sql.Row
-import org.apache.spark.sql.hive.HiveContext
 import org.scalatest.BeforeAndAfterAll
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.keygenerator.directdictionary.timestamp.TimeStampGranularityConstants
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.spark.sql.test.util.QueryTest
 


[carbondata] 19/27: [CARBONDATA-3269] Fix ArrayIndexOutOfBoundsException of Range_Column when using KryoSerializer

Posted by ra...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/carbondata.git

commit f8bd2292f6de5a2f1d295a9170959aea63ac3004
Author: QiangCai <qi...@qq.com>
AuthorDate: Fri Jan 25 14:29:55 2019 +0800

    [CARBONDATA-3269] Fix ArrayIndexOutOfBoundsException of Range_Column when using KryoSerializer
    
    Fix ArrayIndexOutOfBoundsException of Range_Column when using KryoSerializer
    
    This closes #3100
---
 .../src/main/scala/org/apache/spark/DataSkewRangePartitioner.scala    | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/integration/spark-common/src/main/scala/org/apache/spark/DataSkewRangePartitioner.scala b/integration/spark-common/src/main/scala/org/apache/spark/DataSkewRangePartitioner.scala
index 5f5c376..12285d3 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/DataSkewRangePartitioner.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/DataSkewRangePartitioner.scala
@@ -202,7 +202,7 @@ class DataSkewRangePartitioner[K: Ordering : ClassTag, V](
     }
   }
 
-  private val skewPartitions: Int = if (skewCount == 0) {
+  private var skewPartitions: Int = if (skewCount == 0) {
     0
   } else {
     skewWeights.map(_ - 1).sum
@@ -307,6 +307,7 @@ class DataSkewRangePartitioner[K: Ordering : ClassTag, V](
         case js: JavaSerializer => out.defaultWriteObject()
         case _ =>
           out.writeInt(skewCount)
+          out.writeInt(skewPartitions)
           if (skewCount > 0) {
             out.writeObject(skewIndexes)
             out.writeObject(skewWeights)
@@ -332,6 +333,7 @@ class DataSkewRangePartitioner[K: Ordering : ClassTag, V](
         case js: JavaSerializer => in.defaultReadObject()
         case _ =>
           skewCount = in.readInt()
+          skewPartitions = in.readInt()
           if (skewCount > 0) {
             skewIndexes = in.readObject().asInstanceOf[Array[Int]]
             skewWeights = in.readObject().asInstanceOf[Array[Int]]


[carbondata] 11/27: [CARBONDATA-3265] Fixed memory leak in Range Sort

Posted by ra...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/carbondata.git

commit 59ad8fe608f062e53c23fd5d6392ba2847b3db93
Author: shivamasn <sh...@gmail.com>
AuthorDate: Tue Jan 22 15:28:10 2019 +0530

    [CARBONDATA-3265] Fixed memory leak in Range Sort
    
    In range sort, unsafe memory was not getting cleared in case of task failure.
    So, added a fix for memory leak.
    
    This closes #3095
---
 .../spark/load/DataLoadProcessBuilderOnSpark.scala | 29 ++++++++++++++++------
 1 file changed, 22 insertions(+), 7 deletions(-)

diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
index a5d354a..77d0d84 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
@@ -20,7 +20,7 @@ package org.apache.carbondata.spark.load
 import java.util.Comparator
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.spark.{Accumulator, DataSkewRangePartitioner, RangePartitioner, TaskContext}
+import org.apache.spark.{Accumulator, DataSkewRangePartitioner, TaskContext}
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{DataFrame, SparkSession}
@@ -29,18 +29,19 @@ import org.apache.spark.sql.util.SparkSQLUtil
 import org.apache.spark.storage.StorageLevel
 
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
+import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.row.CarbonRow
 import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes}
 import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, CarbonDimension}
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus}
-import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil, ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util._
 import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer
 import org.apache.carbondata.processing.loading.{CarbonDataLoadConfiguration, DataField, DataLoadProcessBuilder, FailureCauses}
 import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.sort.sortdata.{NewRowComparator, NewRowComparatorForNormalDims, SortParameters}
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
+import org.apache.carbondata.spark.util.CommonUtil
 
 /**
  * Use sortBy operator in spark to load the data
@@ -127,9 +128,11 @@ object DataLoadProcessBuilderOnSpark {
       }
 
     // 4. Write
-    sc.runJob(sortRDD, (context: TaskContext, rows: Iterator[CarbonRow]) =>
+    sc.runJob(sortRDD, (context: TaskContext, rows: Iterator[CarbonRow]) => {
+      setTaskListener()
       DataLoadProcessorStepOnSpark.writeFunc(rows, context.partitionId, modelBroadcast,
-        writeStepRowCounter, conf.value.value))
+        writeStepRowCounter, conf.value.value)
+    })
 
     // clean cache only if persisted and keeping unpersist non-blocking as non-blocking call will
     // not have any functional impact as spark automatically monitors the cache usage on each node
@@ -221,9 +224,11 @@ object DataLoadProcessBuilderOnSpark {
       .map(_._2)
 
     // 4. Sort and Write data
-    sc.runJob(rangeRDD, (context: TaskContext, rows: Iterator[CarbonRow]) =>
+    sc.runJob(rangeRDD, (context: TaskContext, rows: Iterator[CarbonRow]) => {
+      setTaskListener()
       DataLoadProcessorStepOnSpark.sortAndWriteFunc(rows, context.partitionId, modelBroadcast,
-        writeStepRowCounter, conf.value.value))
+        writeStepRowCounter, conf.value.value)
+    })
 
     // Log the number of rows in each step
     LOGGER.info("Total rows processed in step Input Processor: " + inputStepRowCounter.value)
@@ -340,6 +345,16 @@ object DataLoadProcessBuilderOnSpark {
       new PrimtiveOrdering(column.getDataType)
     }
   }
+
+  def setTaskListener(): Unit = {
+    TaskContext.get.addTaskCompletionListener { _ =>
+      CommonUtil.clearUnsafeMemory(ThreadLocalTaskInfo.getCarbonTaskInfo.getTaskId)
+    }
+    TaskMetricsMap.threadLocal.set(Thread.currentThread().getId)
+    val carbonTaskInfo = new CarbonTaskInfo
+    carbonTaskInfo.setTaskId(CarbonUtil.generateUUID())
+    ThreadLocalTaskInfo.setCarbonTaskInfo(carbonTaskInfo)
+  }
 }
 
 class PrimtiveOrdering(dataType: DataType) extends Ordering[Object] {


[carbondata] 27/27: [DOC] Document Update for default sort scope

Posted by ra...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/carbondata.git

commit 72ede5296cd045a81d0026f0e02e8584334f1b46
Author: namanrastogi <na...@gmail.com>
AuthorDate: Tue Jan 29 19:33:58 2019 +0530

    [DOC] Document Update for default sort scope
    
    This closes #3115
---
 docs/configuration-parameters.md | 8 ++++----
 docs/dml-of-carbondata.md        | 1 -
 2 files changed, 4 insertions(+), 5 deletions(-)

diff --git a/docs/configuration-parameters.md b/docs/configuration-parameters.md
index 9f13e97..7b31413 100644
--- a/docs/configuration-parameters.md
+++ b/docs/configuration-parameters.md
@@ -61,7 +61,7 @@ This section provides the details of all the configurations required for the Car
 | carbon.number.of.cores.while.loading | 2 | Number of cores to be used while loading data. This also determines the number of threads to be used to read the input files (csv) in parallel.**NOTE:** This configured value is used in every data loading step to parallelize the operations. Configuring a higher value can lead to increased early thread pre-emption by OS and there by reduce the overall performance. |
 | enable.unsafe.sort | true | CarbonData supports unsafe operations of Java to avoid GC overhead for certain operations. This configuration enables to use unsafe functions in CarbonData. **NOTE:** For operations like data loading, which generates more short lived Java objects, Java GC can be a bottle neck. Using unsafe can overcome the GC overhead and improve the overall performance. |
 | enable.offheap.sort | true | CarbonData supports storing data in off-heap memory for certain operations during data loading and query. This helps to avoid the Java GC and thereby improve the overall performance. This configuration enables using off-heap memory for sorting of data during data loading.**NOTE:**  ***enable.unsafe.sort*** configuration needs to be configured to true for using off-heap |
-| carbon.load.sort.scope | LOCAL_SORT | CarbonData can support various sorting options to match the balance between load and query performance. LOCAL_SORT:All the data given to an executor in the single load is fully sorted and written to carbondata files. Data loading performance is reduced a little as the entire data needs to be sorted in the executor. BATCH_SORT:Sorts the data in batches of configured size and writes to carbondata files. Data loading performance increases as the entir [...]
+| carbon.load.sort.scope | LOCAL_SORT | CarbonData can support various sorting options to match the balance between load and query performance. LOCAL_SORT:All the data given to an executor in the single load is fully sorted and written to carbondata files. Data loading performance is reduced a little as the entire data needs to be sorted in the executor. BATCH_SORT:Sorts the data in batches of configured size and writes to carbondata files. Data loading performance increases as the entir [...]
 | carbon.load.batch.sort.size.inmb | 0 | When  ***carbon.load.sort.scope*** is configured as ***BATCH_SORT***, this configuration needs to be added to specify the batch size for sorting and writing to carbondata files. **NOTE:** It is recommended to keep the value around 45% of ***carbon.sort.storage.inmemory.size.inmb*** to avoid spill to disk. Also it is recommended to keep the value higher than ***carbon.blockletgroup.size.in.mb***. Refer to *carbon.load.sort.scope* for more informati [...]
 | carbon.global.sort.rdd.storage.level | MEMORY_ONLY | Storage level to persist dataset of RDD/dataframe when loading data with 'sort_scope'='global_sort', if user's executor has less memory, set this parameter to 'MEMORY_AND_DISK_SER' or other storage level to correspond to different environment. [See detail](http://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence). |
 | carbon.load.global.sort.partitions | 0 | The number of partitions to use when shuffling data for global sort. Default value 0 means to use same number of map tasks as reduce tasks. **NOTE:** In general, it is recommended to have 2-3 tasks per CPU core in your cluster. |
@@ -207,9 +207,9 @@ RESET
 | enable.unsafe.sort                        | Specifies whether to use unsafe sort during data loading. Unsafe sort reduces the garbage collection during data load operation, resulting in better performance. |
 | carbon.options.date.format                 | Specifies the data format of the date columns in the data being loaded |
 | carbon.options.timestamp.format            | Specifies the timestamp format of the time stamp columns in the data being loaded |
-| carbon.options.sort.scope                 | Specifies how the current data load should be sorted with. **NOTE:** Refer to [Data Loading Configuration](#data-loading-configuration)#carbon.sort.scope for detailed information. |
-| carbon.table.load.sort.scope.<db>.<table> | Overrides the SORT_SCOPE provided in CREATE TABLE.           |
-| carbon.options.global.sort.partitions     |                                                              |
+| carbon.options.sort.scope                 | Specifies how the current data load should be sorted with. This sort parameter is at the table level. **NOTE:** Refer to [Data Loading Configuration](#data-loading-configuration)#carbon.sort.scope for detailed information. |
+| carbon.table.load.sort.scope.db_name.table_name | Overrides the SORT_SCOPE provided in CREATE TABLE.           |
+| carbon.options.global.sort.partitions     | Specifies the number of partitions to be used during global sort.   |
 | carbon.options.serialization.null.format  | Default Null value representation in the data being loaded. **NOTE:** Refer to [Data Loading Configuration](#data-loading-configuration)#carbon.options.serialization.null.format for detailed information. |
 | carbon.query.directQueryOnDataMap.enabled | Specifies whether datamap can be queried directly. This is useful for debugging purposes.**NOTE: **Refer to [Query Configuration](#query-configuration) for detailed information. |
 
diff --git a/docs/dml-of-carbondata.md b/docs/dml-of-carbondata.md
index ec2c053..f89c49a 100644
--- a/docs/dml-of-carbondata.md
+++ b/docs/dml-of-carbondata.md
@@ -491,7 +491,6 @@ CarbonData DML statements are documented here,which includes:
   ```
   ALTER TABLE table_name COMPACT 'CUSTOM' WHERE SEGMENT.ID IN (2,3,4)
   ```
-  NOTE: Compaction is unsupported for table containing Complex columns.
 
 
   - **CLEAN SEGMENTS AFTER Compaction**


[carbondata] 18/27: [CARBONDATA-3268] Fix for query on Varchar Columns showing Null in Presto

Posted by ra...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/carbondata.git

commit 6b09bf7044c6ac28769343e2d6a580ef4f99c8be
Author: manishnalla1994 <ma...@gmail.com>
AuthorDate: Fri Jan 25 10:54:13 2019 +0530

    [CARBONDATA-3268] Fix for query on Varchar Columns showing Null in Presto
    
    Problem: Select query on Varchar columns shows null in Presto as it was going to ObjectStreamReader instead of SliceStreamReader.
    
    Solution: Handled the scenario by adding a check for Varchar while creating StreamReader.
    
    This closes #3099
---
 .../carbondata/presto/CarbonVectorBatch.java       |  2 +-
 .../PrestoTestNonTransactionalTableFiles.scala     | 54 ++++++++++++++++------
 2 files changed, 42 insertions(+), 14 deletions(-)

diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorBatch.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorBatch.java
index aa8b062..c1e2662 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorBatch.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorBatch.java
@@ -95,7 +95,7 @@ public class CarbonVectorBatch {
       return new FloatStreamReader(batchSize, field.getDataType(), dictionary);
     } else if (dataType == DataTypes.BYTE) {
       return new ByteStreamReader(batchSize, field.getDataType(), dictionary);
-    } else if (dataType == DataTypes.STRING) {
+    } else if (dataType == DataTypes.STRING || dataType == DataTypes.VARCHAR) {
       return new SliceStreamReader(batchSize, field.getDataType(), dictionary);
     } else if (DataTypes.isDecimal(dataType)) {
       if (dataType instanceof DecimalType) {
diff --git a/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoTestNonTransactionalTableFiles.scala b/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoTestNonTransactionalTableFiles.scala
index e942dde..6d17b8b 100644
--- a/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoTestNonTransactionalTableFiles.scala
+++ b/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoTestNonTransactionalTableFiles.scala
@@ -22,6 +22,7 @@ import java.sql.SQLException
 import java.util
 
 import org.apache.commons.io.FileUtils
+import org.apache.commons.lang.RandomStringUtils
 import org.scalatest.{BeforeAndAfterAll, FunSuiteLike}
 
 import org.apache.carbondata.common.logging.LogServiceFactory
@@ -45,6 +46,7 @@ class PrestoTestNonTransactionalTableFiles extends FunSuiteLike with BeforeAndAf
   private val systemPath = s"$rootPath/integration/presto/target/system"
   private val writerPath = storePath + "/sdk_output/files"
   private val prestoServer = new PrestoServer
+  private var varcharString = new String
 
   override def beforeAll: Unit = {
     CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SYSTEM_FOLDER_LOCATION,
@@ -67,13 +69,13 @@ class PrestoTestNonTransactionalTableFiles extends FunSuiteLike with BeforeAndAf
     FileUtils.deleteDirectory(new File(writerPath))
     createTable
 
-    buildTestData(3, null)
+    buildTestData(3, null, true)
   }
 
   def buildTestDataMultipleFiles(): Any = {
     FileUtils.deleteDirectory(new File(writerPath))
     createTable
-    buildTestData(1000000, null)
+    buildTestData(1000000, null, false)
   }
 
   private def createTable = {
@@ -83,28 +85,40 @@ class PrestoTestNonTransactionalTableFiles extends FunSuiteLike with BeforeAndAf
     prestoServer
       .execute(
         "create table sdk_output.files(name varchar, age int, id tinyint, height double, salary " +
-        "real) with" +
+        "real, address varchar) with" +
         "(format='CARBON') ")
   }
 
-  def buildTestData(rows: Int, options: util.Map[String, String]): Any = {
-    buildTestData(rows, options, List("name"))
+  def buildTestData(rows: Int, options: util.Map[String, String], varcharDataGen: Boolean): Any = {
+    buildTestData(rows, options, List("name"), varcharDataGen)
   }
 
   // prepare sdk writer output
   def buildTestData(rows: Int,
       options: util.Map[String, String],
-      sortColumns: List[String]): Any = {
+      sortColumns: List[String],
+      varcharDataGen: Boolean): Any = {
     val schema = new StringBuilder()
       .append("[ \n")
       .append("   {\"NaMe\":\"string\"},\n")
       .append("   {\"age\":\"int\"},\n")
       .append("   {\"id\":\"byte\"},\n")
       .append("   {\"height\":\"double\"},\n")
-      .append("   {\"salary\":\"float\"}\n")
+      .append("   {\"salary\":\"float\"},\n")
+      .append("   {\"address\":\"varchar\"}\n")
       .append("]")
       .toString()
 
+    // Build Varchar Column data
+    var varcharValue: String = {
+      if (varcharDataGen) {
+        RandomStringUtils.randomAlphabetic(32001)
+      } else {
+        "a"
+      }
+    }
+
+    varcharString = varcharValue
     try {
       val builder = CarbonWriter.builder()
       val writer =
@@ -132,14 +146,16 @@ class PrestoTestNonTransactionalTableFiles extends FunSuiteLike with BeforeAndAf
               String.valueOf(i),
               String.valueOf(i.toDouble / 2),
               "robot",
-              String.valueOf(i.toFloat / 2)))
+              String.valueOf(i.toFloat / 2),
+              String.valueOf(varcharValue)))
         } else {
           writer
             .write(Array[String]("robot" + i,
               String.valueOf(i),
               String.valueOf(i % 128),
               String.valueOf(i.toDouble / 2),
-              String.valueOf(i.toFloat / 2)))
+              String.valueOf(i.toFloat / 2),
+              String.valueOf(varcharValue)))
         }
         i += 1
       }
@@ -150,7 +166,8 @@ class PrestoTestNonTransactionalTableFiles extends FunSuiteLike with BeforeAndAf
             String.valueOf(i),
             String.valueOf(i),
             String.valueOf(i.toDouble / 2),
-            String.valueOf(i.toFloat / 2)))
+            String.valueOf(i.toFloat / 2),
+            String.valueOf(varcharValue)))
       }
       writer.close()
     } catch {
@@ -281,17 +298,28 @@ class PrestoTestNonTransactionalTableFiles extends FunSuiteLike with BeforeAndAf
       "height" -> 0.0,
       "age" -> 0,
       "salary" -> 0.0,
-      "id" -> 0),
+      "id" -> 0,
+      "address" -> varcharString),
       Map("name" -> "robot1",
         "height" -> 0.5,
         "age" -> 1,
         "salary" -> 0.5,
-        "id" -> 1),
+        "id" -> 1,
+        "address" -> varcharString),
       Map("name" -> "robot2",
         "height" -> 1.0,
         "age" -> 2,
         "salary" -> 1.0,
-        "id" -> 2))
+        "id" -> 2,
+        "address" -> varcharString))
     assert(actualResult.toString() equals expectedResult.toString())
   }
+
+  test("Test for query on Varchar columns") {
+    buildTestDataSingleFile()
+    val actualRes: List[Map[String, Any]] = prestoServer.
+      executeQuery("select max(length(address)) from files")
+    val expectedRes: List[Map[String, Any]] = List(Map("_col0" -> 32001))
+    assert(actualRes.toString() equals expectedRes.toString())
+  }
 }
\ No newline at end of file


[carbondata] 17/27: [CARBONDATA-3235] Fixed Alter Table Rename

Posted by ra...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/carbondata.git

commit d703e3e3cb7f0605c69063d2057ae013030eef45
Author: namanrastogi <na...@gmail.com>
AuthorDate: Wed Jan 23 17:57:35 2019 +0530

    [CARBONDATA-3235] Fixed Alter Table Rename
    
    Fixed negative scenario: Alter Table Rename Table Fail
    
    Problem: When tabe rename is success in hive, for failed in carbon data store, it would throw exception, but would not go back and undo rename in hive.
    
    Solution: A flag to keep check if hive rename has already executed, and of the code breaks after hive rename is done, go back and undo the hive rename.
    
    This closes #3098
---
 .../schema/CarbonAlterTableRenameCommand.scala     | 34 +++++++++++-----------
 1 file changed, 17 insertions(+), 17 deletions(-)

diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
index 01698c9..33f3cd9 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
@@ -43,10 +43,12 @@ private[sql] case class CarbonAlterTableRenameCommand(
 
   override def processMetadata(sparkSession: SparkSession): Seq[Nothing] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-    val oldTableIdentifier = alterTableRenameModel.oldTableIdentifier
-    val newTableIdentifier = alterTableRenameModel.newTableIdentifier
-    val oldDatabaseName = oldTableIdentifier.database
+    val oldTableName = alterTableRenameModel.oldTableIdentifier.table.toLowerCase
+    val newTableName = alterTableRenameModel.newTableIdentifier.table.toLowerCase
+    val oldDatabaseName = alterTableRenameModel.oldTableIdentifier.database
       .getOrElse(sparkSession.catalog.currentDatabase)
+    val oldTableIdentifier = TableIdentifier(oldTableName, Some(oldDatabaseName))
+    val newTableIdentifier = TableIdentifier(newTableName, Some(oldDatabaseName))
     setAuditTable(oldDatabaseName, oldTableIdentifier.table)
     setAuditInfo(Map("newName" -> alterTableRenameModel.newTableIdentifier.table))
     val newDatabaseName = newTableIdentifier.database
@@ -59,8 +61,6 @@ private[sql] case class CarbonAlterTableRenameCommand(
       throw new MalformedCarbonCommandException(s"Table with name $newTableIdentifier " +
                                                 s"already exists")
     }
-    val oldTableName = oldTableIdentifier.table.toLowerCase
-    val newTableName = newTableIdentifier.table.toLowerCase
     LOGGER.info(s"Rename table request has been received for $oldDatabaseName.$oldTableName")
     val metastore = CarbonEnv.getInstance(sparkSession).carbonMetaStore
     val relation: CarbonRelation =
@@ -108,8 +108,8 @@ private[sql] case class CarbonAlterTableRenameCommand(
         dataMapSchemaList.addAll(indexSchemas)
       }
       // invalid data map for the old table, see CARBON-1690
-      val oldTableIdentifier = carbonTable.getAbsoluteTableIdentifier
-      DataMapStoreManager.getInstance().clearDataMaps(oldTableIdentifier)
+      val oldAbsoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
+      DataMapStoreManager.getInstance().clearDataMaps(oldAbsoluteTableIdentifier)
       // get the latest carbon table and check for column existence
       val operationContext = new OperationContext
       // TODO: Pass new Table Path in pre-event.
@@ -125,7 +125,7 @@ private[sql] case class CarbonAlterTableRenameCommand(
       schemaEvolutionEntry.setTableName(newTableName)
       timeStamp = System.currentTimeMillis()
       schemaEvolutionEntry.setTime_stamp(timeStamp)
-      val newTableIdentifier = new CarbonTableIdentifier(oldDatabaseName,
+      val newCarbonTableIdentifier = new CarbonTableIdentifier(oldDatabaseName,
         newTableName, carbonTable.getCarbonTableIdentifier.getTableId)
       val oldIdentifier = TableIdentifier(oldTableName, Some(oldDatabaseName))
       val newIdentifier = TableIdentifier(newTableName, Some(oldDatabaseName))
@@ -133,17 +133,17 @@ private[sql] case class CarbonAlterTableRenameCommand(
       var partitions: Seq[CatalogTablePartition] = Seq.empty
       if (carbonTable.isHivePartitionTable) {
         partitions =
-          sparkSession.sessionState.catalog.listPartitions(oldIdentifier)
+          sparkSession.sessionState.catalog.listPartitions(oldTableIdentifier)
       }
-      sparkSession.catalog.refreshTable(oldIdentifier.quotedString)
+      sparkSession.catalog.refreshTable(oldTableIdentifier.quotedString)
       sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog].alterTableRename(
-          oldIdentifier,
-          newIdentifier,
-        oldTableIdentifier.getTablePath)
+          oldTableIdentifier,
+          newTableIdentifier,
+        oldAbsoluteTableIdentifier.getTablePath)
       hiveRenameSuccess = true
 
       metastore.updateTableSchemaForAlter(
-        newTableIdentifier,
+        newCarbonTableIdentifier,
         carbonTable.getCarbonTableIdentifier,
         tableInfo,
         schemaEvolutionEntry,
@@ -157,11 +157,11 @@ private[sql] case class CarbonAlterTableRenameCommand(
       val alterTableRenamePostEvent: AlterTableRenamePostEvent = AlterTableRenamePostEvent(
         carbonTable,
         alterTableRenameModel,
-        oldTableIdentifier.getTablePath,
+        oldAbsoluteTableIdentifier.getTablePath,
         sparkSession)
       OperationListenerBus.getInstance().fireEvent(alterTableRenamePostEvent, operationContext)
 
-      sparkSession.catalog.refreshTable(newIdentifier.quotedString)
+      sparkSession.catalog.refreshTable(newTableIdentifier.quotedString)
       LOGGER.info(s"Table $oldTableName has been successfully renamed to $newTableName")
     } catch {
       case e: ConcurrentOperationException =>
@@ -171,7 +171,7 @@ private[sql] case class CarbonAlterTableRenameCommand(
           sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog].alterTableRename(
             newTableIdentifier,
             oldTableIdentifier,
-            carbonTable.getAbsoluteTableIdentifier.getTableName)
+            carbonTable.getAbsoluteTableIdentifier.getTablePath)
         }
         if (carbonTable != null) {
           AlterTableUtil.revertRenameTableChanges(


[carbondata] 10/27: [CARBONDATA-3267]Fixed Range Sort OOM Issue

Posted by ra...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/carbondata.git

commit 99c2e504d8d9e3c5fed76024686d3ce0eccffae6
Author: kumarvishal09 <ku...@gmail.com>
AuthorDate: Tue Jan 22 22:37:26 2019 +0530

    [CARBONDATA-3267]Fixed Range Sort OOM Issue
    
    Problem:
    Range sort is failing with OOM.
    
    Root cause:
    This is because UnsafeSortStorageMemory is not able to control the off heap memory because of this when huge data is loaded it OOM exception is coming fron UnsafeMemoryAllocator.allocate.
    
    Solution:
    Added code code to control Sort Storage memory. After sorting the rows if memory is available then only add sorted records to sort storage memory otherwise write to disk
    
    This closes #3096
---
 .../carbondata/core/memory/IntPointerBuffer.java   | 10 +--
 .../core/memory/UnsafeSortMemoryManager.java       | 77 +++-------------------
 .../loading/sort/unsafe/UnsafeCarbonRowPage.java   |  9 +--
 .../loading/sort/unsafe/UnsafeSortDataRows.java    | 28 ++++----
 .../unsafe/merger/UnsafeIntermediateMerger.java    |  4 +-
 .../processing/sort/sortdata/SortParameters.java   |  1 -
 6 files changed, 29 insertions(+), 100 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/memory/IntPointerBuffer.java b/core/src/main/java/org/apache/carbondata/core/memory/IntPointerBuffer.java
index c596b08..1f1c865 100644
--- a/core/src/main/java/org/apache/carbondata/core/memory/IntPointerBuffer.java
+++ b/core/src/main/java/org/apache/carbondata/core/memory/IntPointerBuffer.java
@@ -75,17 +75,17 @@ public class IntPointerBuffer {
   }
 
   public void loadToUnsafe() {
-    try {
-      pointerMemoryBlock =
-          UnsafeSortMemoryManager.allocateMemoryWithRetry(this.taskId, pointerBlock.length * 4);
+    pointerMemoryBlock =
+        UnsafeSortMemoryManager.INSTANCE.allocateMemory(this.taskId, pointerBlock.length * 4);
+    // pointerMemoryBlock it means sort storage memory manager does not have space to loaf pointer
+    // buffer in that case use pointerBlock
+    if (null != pointerMemoryBlock) {
       for (int i = 0; i < pointerBlock.length; i++) {
         CarbonUnsafe.getUnsafe()
             .putInt(pointerMemoryBlock.getBaseObject(), pointerMemoryBlock.getBaseOffset() + i * 4,
                 pointerBlock[i]);
       }
       pointerBlock = null;
-    } catch (MemoryException e) {
-      LOGGER.warn("Not enough memory for allocating pointer buffer, sorting in heap");
     }
   }
 
diff --git a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeSortMemoryManager.java b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeSortMemoryManager.java
index 847f6e2..3c12733 100644
--- a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeSortMemoryManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeSortMemoryManager.java
@@ -112,17 +112,6 @@ public class UnsafeSortMemoryManager {
   }
 
   /**
-   * Below method will be used to check whether memory required is
-   * available or not
-   *
-   * @param required
-   * @return if memory available
-   */
-  public synchronized boolean isMemoryAvailable(long required) {
-    return memoryUsed + required < totalMemory;
-  }
-
-  /**
    * total usable memory for sort memory manager
    * @return size in bytes
    */
@@ -130,21 +119,6 @@ public class UnsafeSortMemoryManager {
     return totalMemory;
   }
 
-  /**
-   * Below method will be used to allocate dummy memory
-   * this will be used to allocate first and then used when u need
-   *
-   * @param size
-   */
-  public synchronized void allocateDummyMemory(long size) {
-    memoryUsed += size;
-    if (LOGGER.isDebugEnabled()) {
-      LOGGER.debug(String.format(
-          "Sort Memory block is created with size %d. Total memory used %d Bytes, left %d Bytes",
-          size, memoryUsed, totalMemory - memoryUsed));
-    }
-  }
-
   public synchronized void freeMemory(String taskId, MemoryBlock memoryBlock) {
     if (taskIdToMemoryBlockMap.containsKey(taskId)) {
       taskIdToMemoryBlockMap.get(taskId).remove(memoryBlock);
@@ -195,52 +169,17 @@ public class UnsafeSortMemoryManager {
   }
 
   /**
-   * Before calling this method caller should call allocateMemoryDummy
-   * This method will be used to allocate the memory, this can be used
-   * when caller wants to allocate memory first and used it anytime
-   * @param taskId
-   * @param memoryRequested
-   * @return memory block
-   */
-  public synchronized MemoryBlock allocateMemoryLazy(String taskId, long memoryRequested) {
-    MemoryBlock allocate = allocator.allocate(memoryRequested);
-    Set<MemoryBlock> listOfMemoryBlock = taskIdToMemoryBlockMap.get(taskId);
-    if (null == listOfMemoryBlock) {
-      listOfMemoryBlock = new HashSet<>();
-      taskIdToMemoryBlockMap.put(taskId, listOfMemoryBlock);
-    }
-    listOfMemoryBlock.add(allocate);
-    return allocate;
-  }
-
-  /**
-   * It tries to allocate memory of `size` bytes, keep retry until it allocates successfully.
+   * Below method will be used to check whether memory required is
+   * available or not
+   *
+   * @param required
+   * @return if memory available
    */
-  public static MemoryBlock allocateMemoryWithRetry(String taskId, long size)
-          throws MemoryException {
-    MemoryBlock baseBlock = null;
-    int tries = 0;
-    while (tries < 100) {
-      baseBlock = INSTANCE.allocateMemory(taskId, size);
-      if (baseBlock == null) {
-        try {
-          Thread.sleep(50);
-        } catch (InterruptedException e) {
-          throw new MemoryException(e);
-        }
-      } else {
-        break;
-      }
-      tries++;
-    }
-    if (baseBlock == null) {
-      throw new MemoryException("Not enough sort memory, please increase "
-          + CarbonCommonConstants.CARBON_SORT_STORAGE_INMEMORY_IN_MB);
-    }
-    return baseBlock;
+  public synchronized boolean isMemoryAvailable(long required) {
+    return memoryUsed + required < totalMemory;
   }
 
-  private synchronized MemoryBlock allocateMemory(String taskId, long memoryRequested) {
+  public synchronized MemoryBlock allocateMemory(String taskId, long memoryRequested) {
     if (memoryUsed + memoryRequested <= totalMemory) {
       MemoryBlock allocate = allocator.allocate(memoryRequested);
       memoryUsed += allocate.size();
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java
index a480cf7..21403b0 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java
@@ -42,8 +42,6 @@ public class UnsafeCarbonRowPage {
 
   private MemoryBlock dataBlock;
 
-  private boolean saveToDisk;
-
   private MemoryManagerType managerType;
 
   private String taskId;
@@ -53,10 +51,9 @@ public class UnsafeCarbonRowPage {
   private boolean convertNoSortFields;
 
   public UnsafeCarbonRowPage(TableFieldStat tableFieldStat, MemoryBlock memoryBlock,
-      boolean saveToDisk, String taskId) {
+      String taskId) {
     this.tableFieldStat = tableFieldStat;
     this.sortStepRowHandler = new SortStepRowHandler(tableFieldStat);
-    this.saveToDisk = saveToDisk;
     this.taskId = taskId;
     buffer = new IntPointerBuffer(this.taskId);
     this.dataBlock = memoryBlock;
@@ -126,10 +123,6 @@ public class UnsafeCarbonRowPage {
     }
   }
 
-  public boolean isSaveToDisk() {
-    return saveToDisk;
-  }
-
   public IntPointerBuffer getBuffer() {
     return buffer;
   }
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
index 24faa51..e8e1c08 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
@@ -148,13 +148,10 @@ public class UnsafeSortDataRows {
         UnsafeMemoryManager.allocateMemoryWithRetry(this.taskId, inMemoryChunkSize);
     boolean isMemoryAvailable =
         UnsafeSortMemoryManager.INSTANCE.isMemoryAvailable(baseBlock.size());
-    if (isMemoryAvailable) {
-      UnsafeSortMemoryManager.INSTANCE.allocateDummyMemory(baseBlock.size());
-    } else {
-      // merge and spill in-memory pages to disk if memory is not enough
-      unsafeInMemoryIntermediateFileMerger.tryTriggerInmemoryMerging(true);
+    if (!isMemoryAvailable) {
+      unsafeInMemoryIntermediateFileMerger.tryTriggerInMemoryMerging(true);
     }
-    return new UnsafeCarbonRowPage(tableFieldStat, baseBlock, !isMemoryAvailable, taskId);
+    return new UnsafeCarbonRowPage(tableFieldStat, baseBlock, taskId);
   }
 
   public boolean canAdd() {
@@ -382,7 +379,12 @@ public class UnsafeSortDataRows {
           timSort.sort(page.getBuffer(), 0, page.getBuffer().getActualSize(),
               new UnsafeRowComparatorForNormalDims(page));
         }
-        if (page.isSaveToDisk()) {
+        // get sort storage memory block if memory is available in sort storage manager
+        // if space is available then store it in memory, if memory is not available
+        // then spill to disk
+        MemoryBlock sortStorageMemoryBlock =
+            UnsafeSortMemoryManager.INSTANCE.allocateMemory(taskId, page.getDataBlock().size());
+        if (null == sortStorageMemoryBlock) {
           // create a new file every time
           // create a new file and pick a temp directory randomly every time
           String tmpDir = parameters.getTempFileLocation()[
@@ -400,18 +402,14 @@ public class UnsafeSortDataRows {
           // intermediate merging of sort temp files will be triggered
           unsafeInMemoryIntermediateFileMerger.addFileToMerge(sortTempFile);
         } else {
-          // creating a new memory block as size is already allocated
-          // so calling lazy memory allocator
-          MemoryBlock newMemoryBlock = UnsafeSortMemoryManager.INSTANCE
-              .allocateMemoryLazy(taskId, page.getDataBlock().size());
-          // copying data from working memory manager to sortmemory manager
+          // copying data from working memory manager block to storage memory manager block
           CarbonUnsafe.getUnsafe()
               .copyMemory(page.getDataBlock().getBaseObject(), page.getDataBlock().getBaseOffset(),
-                  newMemoryBlock.getBaseObject(), newMemoryBlock.getBaseOffset(),
-                  page.getDataBlock().size());
+                  sortStorageMemoryBlock.getBaseObject(),
+                  sortStorageMemoryBlock.getBaseOffset(), page.getDataBlock().size());
           // free unsafememory manager
           page.freeMemory();
-          page.setNewDataBlock(newMemoryBlock);
+          page.setNewDataBlock(sortStorageMemoryBlock);
           // add sort temp filename to and arrayList. When the list size reaches 20 then
           // intermediate merging of sort temp files will be triggered
           page.getBuffer().loadToUnsafe();
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateMerger.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateMerger.java
index f972f0c..1389ff7 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateMerger.java
@@ -140,7 +140,7 @@ public class UnsafeIntermediateMerger {
     mergerTask.add(executorService.submit(merger));
   }
 
-  public void tryTriggerInmemoryMerging(boolean spillDisk)
+  public void tryTriggerInMemoryMerging(boolean spillDisk)
       throws CarbonSortKeyAndGroupByException {
     List<UnsafeCarbonRowPage> pages2Merge = new ArrayList<>();
     int totalRows2Merge = 0;
@@ -170,7 +170,7 @@ public class UnsafeIntermediateMerger {
 
   public void startInmemoryMergingIfPossible() throws CarbonSortKeyAndGroupByException {
     if (rowPages.size() >= parameters.getNumberOfIntermediateFileToBeMerged()) {
-      tryTriggerInmemoryMerging(false);
+      tryTriggerInMemoryMerging(false);
     }
   }
 
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
index 09dd52f..6fec8dc 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
@@ -441,7 +441,6 @@ public class SortParameters implements Serializable {
 
     parameters.setTempFileLocation(sortTempDirs);
     LOGGER.info("temp file location: " + StringUtils.join(parameters.getTempFileLocation(), ","));
-
     int numberOfCores = 1;
     // In case of loading from partition we should use the cores specified by it
     if (configuration.getWritingCoresCount() > 0) {


[carbondata] 02/27: [CARBONDATA-3259] Added documentation for new complex delimiters

Posted by ra...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/carbondata.git

commit 42e30b5d12a33eb68c8da419dd1a841fdd288e29
Author: manishnalla1994 <ma...@gmail.com>
AuthorDate: Mon Jan 21 10:30:29 2019 +0530

    [CARBONDATA-3259] Added documentation for new complex delimiters
    
    Change of Level_1 and Level_2 Default Complex Delimiters to '\001'
    and '\002' and added new Level_3 delimiter as '\003' in documentation.
    
    This closes #3086
---
 docs/dml-of-carbondata.md | 13 +++++++++++--
 1 file changed, 11 insertions(+), 2 deletions(-)

diff --git a/docs/dml-of-carbondata.md b/docs/dml-of-carbondata.md
index d26cf19..c8d72ef 100644
--- a/docs/dml-of-carbondata.md
+++ b/docs/dml-of-carbondata.md
@@ -135,7 +135,7 @@ CarbonData DML statements are documented here,which includes:
     Split the complex type data column in a row (eg., a\001b\001c --> Array = {a,b,c}).
 
     ```
-    OPTIONS('COMPLEX_DELIMITER_LEVEL_1'='\\\001')
+    OPTIONS('COMPLEX_DELIMITER_LEVEL_1'='\001')
     ```
 
   - ##### COMPLEX_DELIMITER_LEVEL_2:
@@ -143,7 +143,16 @@ CarbonData DML statements are documented here,which includes:
     Split the complex type nested data column in a row. Applies level_1 delimiter & applies level_2 based on complex data type (eg., a\002b\001c\002d --> Array> = {{a,b},{c,d}}).
 
     ```
-    OPTIONS('COMPLEX_DELIMITER_LEVEL_2'='\\\002')
+    OPTIONS('COMPLEX_DELIMITER_LEVEL_2'='\002')
+    ```
+
+  - ##### COMPLEX_DELIMITER_LEVEL_3:
+
+    Split the complex type nested data column in a row. Applies level_1 delimiter, applies level_2 and then level_3 delimiter based on complex data type.
+     Used in case of nested Complex Map type. (eg., 'a\003b\002b\003c\001aa\003bb\002cc\003dd' --> Array Of Map> = {{a -> b, b -> c},{aa -> bb, cc -> dd}}).
+
+    ```
+    OPTIONS('COMPLEX_DELIMITER_LEVEL_3'='\003')
     ```
 
   - ##### ALL_DICTIONARY_PATH:


[carbondata] 06/27: [DOC] Updated Presto guide

Posted by ra...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/carbondata.git

commit e0146e6c6e191ca325bb4a46bb2231b5df04c07f
Author: ravipesala <ra...@gmail.com>
AuthorDate: Mon Jan 21 12:27:40 2019 +0530

    [DOC] Updated Presto guide
    
    Added more information about CarbonData configurations for Presto and clearly highlighted the sections in the document
    
    This closes #3088
---
 docs/presto-guide.md | 57 +++++++++++++++++++++++++---------------------------
 1 file changed, 27 insertions(+), 30 deletions(-)

diff --git a/docs/presto-guide.md b/docs/presto-guide.md
index 8b3a527..054f29f 100644
--- a/docs/presto-guide.md
+++ b/docs/presto-guide.md
@@ -101,6 +101,7 @@ This tutorial provides a quick introduction to using current integration/presto
   ```
 The options `node-scheduler.include-coordinator=false` and `coordinator=true` indicate that the node is the coordinator and tells the coordinator not to do any of the computation work itself and to use the workers.
 
+
 **Note**: We recommend setting `query.max-memory-per-node` to half of the JVM config max memory, though if your workload is highly concurrent, you may want to use a lower value for `query.max-memory-per-node`.
 
 Also relation between below two configuration-properties should be like:
@@ -167,8 +168,10 @@ Now you can use the Presto CLI on the coordinator to query data sources in the c
 * Download presto server (0.210 is suggested and supported) : https://repo1.maven.org/maven2/com/facebook/presto/presto-server/
 * Finish presto configuration following https://prestodb.io/docs/current/installation/deployment.html.
   A configuration example:
+  
+ **config.properties**
+  
   ```
-  config.properties:
   coordinator=true
   node-scheduler.include-coordinator=true
   http-server.http.port=8086
@@ -180,10 +183,13 @@ Now you can use the Presto CLI on the coordinator to query data sources in the c
   discovery.uri=http://localhost:8086
   task.max-worker-threads=4
   optimizer.dictionary-aggregation=true
-  optimizer.optimize-hash-generation = false
+  optimizer.optimize-hash-generation = false  
+  ``` 
  
   
-  jvm.config:
+  **jvm.config**
+  
+  ```
   -server
   -Xmx4G
   -XX:+UseG1GC
@@ -193,12 +199,20 @@ Now you can use the Presto CLI on the coordinator to query data sources in the c
   -XX:+HeapDumpOnOutOfMemoryError
   -XX:OnOutOfMemoryError=kill -9 %p
   -XX:+TraceClassLoading
+  -Dcarbon.properties.filepath=<path>/carbon.properties
   
-  log.properties:
+  ```
+  `carbon.properties.filepath` property is used to set the carbon.properties file path and it is recommended to set otherwise some features may not work. Please check the above example.
+  
+  
+  **log.properties**
+  ```
   com.facebook.presto=DEBUG
   com.facebook.presto.server.PluginManager=DEBUG
+  ```
   
-  node.properties:
+  **node.properties**
+  ```
   node.environment=carbondata
   node.id=ffffffff-ffff-ffff-ffff-ffffffffffff
   node.data-dir=/Users/apple/DEMO/presto_test/data
@@ -220,21 +234,20 @@ Now you can use the Presto CLI on the coordinator to query data sources in the c
   Secondly: Create a folder named 'carbondata' under $PRESTO_HOME$/plugin and
   copy all jars from carbondata/integration/presto/target/carbondata-presto-x.x.x-SNAPSHOT
         to $PRESTO_HOME$/plugin/carbondata
+ 
   **NOTE:**  Copying assemble jar alone will not work, need to copy all jars from integration/presto/target/carbondata-presto-x.x.x-SNAPSHOT
   
   Thirdly: Create a carbondata.properties file under $PRESTO_HOME$/etc/catalog/ containing the following contents:
   ```
   connector.name=carbondata
-  carbondata-store={schema-store-path}
-  enable.unsafe.in.query.processing=false
-  carbon.unsafe.working.memory.in.mb={value}
-  enable.unsafe.columnpage=false
-  enable.unsafe.sort=false
-
+  hive.metastore.uri=thrift://<host>:<port>
   ```
-  Replace the schema-store-path with the absolute path of the parent directory of the schema.
-  For example, if you have a schema named 'default' stored in hdfs://namenode:9000/test/carbondata/,
-  Then set carbondata-store=hdfs://namenode:9000/test/carbondata
+  Carbondata becomes one of the supported format of presto hive plugin, so the configurations and setup is similar to hive connector of presto.
+  Please refer <a>https://prestodb.io/docs/current/connector/hive.html</a> for more details.
+  
+  **Note**: Since carbon can work only with hive metastore, it is necessary that spark also connects to same metastore db for creating tables and updating tables.
+  All the operations done on spark will be reflected in presto immediately. 
+  It is mandatory to create Carbon tables from spark using CarbonData 1.5.2 or greater version since input/output formats are updated in carbon table properly from this version. 
   
 #### Connecting to carbondata store on s3
  * In case you want to query carbonstore on S3 using S3A api put following additional properties inside $PRESTO_HOME$/etc/catalog/carbondata.properties 
@@ -258,23 +271,7 @@ Now you can use the Presto CLI on the coordinator to query data sources in the c
         fs.s3n.awsAccessKeyId={value}
         fs.s3n.awsSecretAccessKey={value}
      ```
-     
-    Replace the schema-store-path with the absolute path of the parent directory of the schema.
-    For example, if you have a schema named 'default' stored in a bucket s3a://s3-carbon/store,
-    Then set carbondata-store=s3a://s3-carbon/store
     
-####  Unsafe Properties    
-  enable.unsafe.in.query.processing property by default is true in CarbonData system, the carbon.unsafe.working.memory.in.mb 
-  property defines the limit for Unsafe Memory usage in Mega Bytes, the default value is 512 MB.
-  Currently Presto does not support Unsafe Memory so we have to disable the unsafe feature by setting below properties to false.
-
-  enable.unsafe.in.query.processing=false.
-  enable.unsafe.columnpage=false
-  enable.unsafe.sort=false
-
-  If you updated the jar balls or configuration files, make sure you have dispatched them
-   to all the presto nodes and restarted the presto servers on the nodes. The updates will not take effect before restarting.
-  
 ### Generate CarbonData file
 
 Please refer to quick start: https://github.com/apache/carbondata/blob/master/docs/quick-start-guide.md.


[carbondata] 24/27: [CARBONDATA-3282] set hadoop conf to thread local for file factory usage in presto carbon

Posted by ra...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/carbondata.git

commit cd9d7b34d6c61d3767bc7c3723f40fe801fd3c56
Author: ajantha-bhat <aj...@gmail.com>
AuthorDate: Tue Jan 29 18:39:49 2019 +0530

    [CARBONDATA-3282] set hadoop conf to thread local for file factory usage in presto carbon
    
    [HOTFIX] set hadoop conf to thread local for file factory usage in presto carbon.
    and Added bloom dependency in presto.
    
    This closes #3114
---
 docs/presto-guide.md                                                 | 5 +++++
 integration/presto/pom.xml                                           | 5 +++++
 .../java/org/apache/carbondata/presto/CarbondataSplitManager.java    | 3 +++
 3 files changed, 13 insertions(+)

diff --git a/docs/presto-guide.md b/docs/presto-guide.md
index 7389bc6..6bb8196 100644
--- a/docs/presto-guide.md
+++ b/docs/presto-guide.md
@@ -280,3 +280,8 @@ carbondata files.
   ```
   Replace the hostname, port and schema name with your own.
 
+### Supported features of presto carbon
+Presto carbon only supports reading the carbon table which is written by spark carbon or carbon SDK. 
+During reading, it supports the non-distributed datamaps like block datamap and bloom datamap.
+It doesn't support MV datamap and Pre-aggregate datamap as it needs query plan to be changed and presto does not allow it.
+Also Presto carbon supports streaming segment read from streaming table created by spark.
diff --git a/integration/presto/pom.xml b/integration/presto/pom.xml
index 0484781..3d52041 100644
--- a/integration/presto/pom.xml
+++ b/integration/presto/pom.xml
@@ -551,6 +551,11 @@
       <artifactId>httpclient</artifactId>
       <version>4.5.5</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.carbondata</groupId>
+      <artifactId>carbondata-bloom</artifactId>
+      <version>${project.version}</version>
+    </dependency>
   </dependencies>
 
   <build>
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplitManager.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplitManager.java
index 6efef93..0902058 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplitManager.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplitManager.java
@@ -30,6 +30,7 @@ import org.apache.carbondata.core.stats.QueryStatistic;
 import org.apache.carbondata.core.stats.QueryStatisticsConstants;
 import org.apache.carbondata.core.stats.QueryStatisticsRecorder;
 import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
+import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
 import org.apache.carbondata.presto.impl.CarbonLocalMultiBlockSplit;
 import org.apache.carbondata.presto.impl.CarbonTableCacheModel;
 import org.apache.carbondata.presto.impl.CarbonTableReader;
@@ -117,6 +118,8 @@ public class CarbondataSplitManager extends HiveSplitManager {
         new HdfsEnvironment.HdfsContext(session, schemaTableName.getSchemaName(),
             schemaTableName.getTableName()), new Path(location));
     configuration = carbonTableReader.updateS3Properties(configuration);
+    // set the hadoop configuration to thread local, so that FileFactory can use it.
+    ThreadLocalSessionInfo.setConfigurationToCurrentThread(configuration);
     CarbonTableCacheModel cache =
         carbonTableReader.getCarbonCache(schemaTableName, location, configuration);
     Expression filters = PrestoFilterUtil.parseFilterExpression(predicate);


[carbondata] 05/27: [CARBONDATA-3246]Fix sdk reader issue if batch size is given as zero and vectorRead False.

Posted by ra...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/carbondata.git

commit 6c466cd4251eff992a9b8a0f5b6d2a45b566e408
Author: shardul-cr7 <sh...@gmail.com>
AuthorDate: Wed Jan 23 11:31:04 2019 +0530

    [CARBONDATA-3246]Fix sdk reader issue if batch size is given as zero and vectorRead False.
    
    Problem: SDK reader is failing if vectorRead is false and detail query batch size is given as 0.
    Compiler is giving stack overflow error after getting stuck in ChunkRowIterator.hasnext recurssion.
    
    Solution: Since 0 is wrong batch size, we should take DETAIL_QUERY_BATCH_SIZE_DEFAULT as the batch size
    
    This closes #3097
---
 .../core/constants/CarbonCommonConstants.java      | 10 ++++++
 .../AbstractDetailQueryResultIterator.java         |  3 --
 .../carbondata/core/util/CarbonProperties.java     | 39 ++++++++++++++++++++-
 docs/configuration-parameters.md                   |  2 +-
 .../carbondata/sdk/file/CarbonReaderTest.java      | 23 +++++++++++++
 .../sdk/file/ConcurrentSdkReaderTest.java          | 40 ----------------------
 6 files changed, 72 insertions(+), 45 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index ccc8b99..b7d9761 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1228,6 +1228,16 @@ public final class CarbonCommonConstants {
   public static final int DETAIL_QUERY_BATCH_SIZE_DEFAULT = 100;
 
   /**
+   * Maximum batch size of carbon.detail.batch.size property
+   */
+  public static final int DETAIL_QUERY_BATCH_SIZE_MAX = 1000;
+
+  /**
+   * Minimum batch size of carbon.detail.batch.size property
+   */
+  public static final int DETAIL_QUERY_BATCH_SIZE_MIN = 100;
+
+  /**
    * max driver lru cache size upto which lru cache will be loaded in memory
    */
   @CarbonProperty
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
index 30f5183..9282d44 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
@@ -94,9 +94,6 @@ public abstract class AbstractDetailQueryResultIterator<E> extends CarbonIterato
     if (null != batchSizeString) {
       try {
         batchSize = Integer.parseInt(batchSizeString);
-        if (0 == batchSize) {
-          batchSize = CarbonCommonConstants.DETAIL_QUERY_BATCH_SIZE_DEFAULT;
-        }
       } catch (NumberFormatException ne) {
         LOGGER.error("Invalid inmemory records size. Using default value");
         batchSize = CarbonCommonConstants.DETAIL_QUERY_BATCH_SIZE_DEFAULT;
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index f9131f5..49388b7 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -56,6 +56,10 @@ import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_
 import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_MERGE_FILES;
 import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT;
 import static org.apache.carbondata.core.constants.CarbonCommonConstants.CSV_READ_BUFFER_SIZE;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.DETAIL_QUERY_BATCH_SIZE;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.DETAIL_QUERY_BATCH_SIZE_DEFAULT;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.DETAIL_QUERY_BATCH_SIZE_MAX;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.DETAIL_QUERY_BATCH_SIZE_MIN;
 import static org.apache.carbondata.core.constants.CarbonCommonConstants.ENABLE_AUTO_HANDOFF;
 import static org.apache.carbondata.core.constants.CarbonCommonConstants.ENABLE_OFFHEAP_SORT;
 import static org.apache.carbondata.core.constants.CarbonCommonConstants.ENABLE_UNSAFE_SORT;
@@ -192,6 +196,9 @@ public final class CarbonProperties {
       case CARBON_MINMAX_ALLOWED_BYTE_COUNT:
         validateStringCharacterLimit();
         break;
+      case DETAIL_QUERY_BATCH_SIZE:
+        validateDetailQueryBatchSize();
+        break;
       // TODO : Validation for carbon.lock.type should be handled for addProperty flow
       default:
         // none
@@ -256,6 +263,7 @@ public final class CarbonProperties {
     validateEnableQueryStatistics();
     validateSortMemorySpillPercentage();
     validateStringCharacterLimit();
+    validateDetailQueryBatchSize();
   }
 
   /**
@@ -1547,5 +1555,34 @@ public final class CarbonProperties {
     }
   }
 
-
+  /**
+   * This method validates the DETAIL_QUERY_BATCH_SIZE. If some invalid input is set, we use the
+   * default value for this property
+   */
+  private void validateDetailQueryBatchSize() {
+    String batchSizeString =
+        carbonProperties.getProperty(DETAIL_QUERY_BATCH_SIZE);
+    if (batchSizeString == null) {
+      carbonProperties.setProperty(DETAIL_QUERY_BATCH_SIZE,
+          Integer.toString(DETAIL_QUERY_BATCH_SIZE_DEFAULT));
+      LOGGER.info(
+          "Using default value for carbon.detail.batch.size " + DETAIL_QUERY_BATCH_SIZE_DEFAULT);
+    } else {
+      int batchSize;
+      try {
+        batchSize = Integer.parseInt(batchSizeString);
+        if (batchSize < DETAIL_QUERY_BATCH_SIZE_MIN || batchSize > DETAIL_QUERY_BATCH_SIZE_MAX) {
+          LOGGER.info("Invalid carbon.detail.batch.size.Using default value "
+              + DETAIL_QUERY_BATCH_SIZE_DEFAULT);
+          carbonProperties.setProperty(DETAIL_QUERY_BATCH_SIZE,
+              Integer.toString(DETAIL_QUERY_BATCH_SIZE_DEFAULT));
+        }
+      } catch (NumberFormatException ne) {
+        LOGGER.info("Invalid carbon.detail.batch.size.Using default value "
+            + DETAIL_QUERY_BATCH_SIZE_DEFAULT);
+        carbonProperties.setProperty(DETAIL_QUERY_BATCH_SIZE,
+            Integer.toString(DETAIL_QUERY_BATCH_SIZE_DEFAULT));
+      }
+    }
+  }
 }
diff --git a/docs/configuration-parameters.md b/docs/configuration-parameters.md
index c7d8152..d28ad61 100644
--- a/docs/configuration-parameters.md
+++ b/docs/configuration-parameters.md
@@ -129,7 +129,7 @@ This section provides the details of all the configurations required for the Car
 | carbon.search.master.port | 10020 | Port on which the search master listens for incoming query requests |
 | carbon.search.worker.port | 10021 | Port on which search master communicates with the workers. |
 | carbon.search.worker.workload.limit | 10 * *carbon.search.scan.thread* | Maximum number of active requests that can be sent to a worker. Beyond which the request needs to be rescheduled for later time or to a different worker. |
-| carbon.detail.batch.size | 100 | The buffer size to store records, returned from the block scan. In limit scenario this parameter is very important. For example your query limit is 1000. But if we set this value to 3000 that means we get 3000 records from scan but spark will only take 1000 rows. So the 2000 remaining are useless. In one Finance test case after we set it to 100, in the limit 1000 scenario the performance increase about 2 times in comparison to if we set this value to 12000. |
+| carbon.detail.batch.size | 100 | The buffer size to store records, returned from the block scan. In limit scenario this parameter is very important. For example your query limit is 1000. But if we set this value to 3000 that means we get 3000 records from scan but spark will only take 1000 rows. So the 2000 remaining are useless. In one Finance test case after we set it to 100, in the limit 1000 scenario the performance increase about 2 times in comparison to if we set this value to 12 [...]
 | carbon.enable.vector.reader | true | Spark added vector processing to optimize cpu cache miss and there by increase the query performance. This configuration enables to fetch data as columnar batch of size 4*1024 rows instead of fetching data row by row and provide it to spark so that there is improvement in  select queries performance. |
 | carbon.task.distribution | block | CarbonData has its own scheduling algorithm to suggest to Spark on how many tasks needs to be launched and how much work each task need to do in a Spark cluster for any query on CarbonData. Each of these task distribution suggestions has its own advantages and disadvantages. Based on the customer use case, appropriate task distribution can be configured.**block**: Setting this value will launch one task per block. This setting is suggested in case of  [...]
 | carbon.custom.block.distribution | false | CarbonData has its own scheduling algorithm to suggest to Spark on how many tasks needs to be launched and how much work each task need to do in a Spark cluster for any query on CarbonData. When this configuration is true, CarbonData would distribute the available blocks to be scanned among the available number of cores. For Example:If there are 10 blocks to be scanned and only 3 tasks can be run(only 3 executor cores available in the cluster) [...]
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
index acd9e5a..28944da 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
@@ -104,6 +104,29 @@ public class CarbonReaderTest extends TestCase {
     FileUtils.deleteDirectory(new File(path));
   }
 
+  @Test public void testReadWithZeroBatchSize() throws IOException, InterruptedException {
+    String path = "./testWriteFiles";
+    FileUtils.deleteDirectory(new File(path));
+    DataMapStoreManager.getInstance().clearDataMaps(AbsoluteTableIdentifier.from(path));
+    Field[] fields = new Field[2];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("age", DataTypes.INT);
+
+    TestUtil.writeFilesAndVerify(10, new Schema(fields), path);
+    CarbonReader reader;
+    reader = CarbonReader.builder(path).withRowRecordReader().withBatch(0).build();
+
+    int i = 0;
+    while (reader.hasNext()) {
+      Object[] row = (Object[]) reader.readNextRow();
+      Assert.assertEquals(("robot" + (i % 10)), row[0]);
+      Assert.assertEquals(i, row[1]);
+      i++;
+    }
+    Assert.assertEquals(i, 10);
+    FileUtils.deleteDirectory(new File(path));
+  }
+
   @Test
   public void testReadWithFilterOfNonTransactionalSimple() throws IOException, InterruptedException {
     String path = "./testWriteFiles";
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentSdkReaderTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentSdkReaderTest.java
index 31342b9..c75b70f 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentSdkReaderTest.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentSdkReaderTest.java
@@ -131,46 +131,6 @@ public class ConcurrentSdkReaderTest {
       executorService.awaitTermination(10, TimeUnit.MINUTES);
     }
   }
-
-  @Test public void testReadWithZeroBatchSize() throws InterruptedException {
-    int numFiles = 5;
-    int numRowsPerFile = 5;
-    short numThreads = 4;
-    writeDataMultipleFiles(numFiles, numRowsPerFile);
-
-    // Concurrent Reading
-    ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
-    try {
-      long count;
-      CarbonReader reader =
-          CarbonReader.builder(dataDir).withRowRecordReader().withBatch(0).build();
-      List<CarbonReader> multipleReaders = reader.split(numThreads);
-      try {
-        List<ReadLogic> tasks = new ArrayList<>();
-        List<Future<Long>> results;
-        count = 0;
-
-        for (CarbonReader reader_i : multipleReaders) {
-          tasks.add(new ReadLogic(reader_i));
-        }
-        results = executorService.invokeAll(tasks);
-        for (Future result_i : results) {
-          count += (long) result_i.get();
-        }
-        Assert.assertEquals(numFiles * numRowsPerFile, count);
-      } catch (Exception e) {
-        e.printStackTrace();
-        Assert.fail(e.getMessage());
-      }
-    } catch (Exception e) {
-      e.printStackTrace();
-      Assert.fail(e.getMessage());
-    } finally {
-      executorService.shutdown();
-      executorService.awaitTermination(10, TimeUnit.MINUTES);
-    }
-  }
-
   class ReadLogic implements Callable<Long> {
     CarbonReader reader;
 


[carbondata] 21/27: [CARBONDATA-3272]fix ArrayIndexOutOfBoundsException of horizontal compaction during update, when cardinality changes within a segment

Posted by ra...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/carbondata.git

commit b71731cb7d39c83bf28a2fb35950d5132dd27fde
Author: akashrn5 <ak...@gmail.com>
AuthorDate: Fri Jan 25 14:35:32 2019 +0530

    [CARBONDATA-3272]fix ArrayIndexOutOfBoundsException of horizontal compaction during update, when cardinality changes within a segment
    
    Problem:
    During horizontal compaction in update, we prepare a taskBlockMapping to get the resultIterators. horizontal compaction will be done within a segment. Here, source segment properties will be always prepared by the filefooter of first block in the blocklist for a corresponding task. source segment properties will contain the dimensionKeyGenerator which will be used to convert the rows. If the cardinality is different for two blcoks for a task, then the dimensionKeyGenerator will be dif [...]
    
    Solution
    so get all the blocks present in a task and then split into multiple lists of same key length and create separate RawResultIterator for each list of same key length. If all the blocks have same keylength, then make a single RawResultIterator for all the blocks
    
    This closes #3102
---
 .../core/scan/wrappers/IntArrayWrapper.java        | 47 +++++++++++++
 .../merger/CarbonCompactionExecutor.java           | 77 +++++++++++++++++-----
 .../processing/merger/CarbonCompactionUtil.java    |  2 +-
 3 files changed, 109 insertions(+), 17 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/scan/wrappers/IntArrayWrapper.java b/core/src/main/java/org/apache/carbondata/core/scan/wrappers/IntArrayWrapper.java
new file mode 100644
index 0000000..c1a75d5
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/scan/wrappers/IntArrayWrapper.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.scan.wrappers;
+
+import java.util.Arrays;
+
+/**
+ * Wrapper class for int[] data
+ */
+public class IntArrayWrapper {
+
+  private final int[] data;
+
+  public IntArrayWrapper(int[] data) {
+    this.data = data;
+  }
+
+  @Override public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    IntArrayWrapper that = (IntArrayWrapper) o;
+    return Arrays.equals(data, that.data);
+  }
+
+  @Override public int hashCode() {
+    return Arrays.hashCode(data);
+  }
+}
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
index 79b66e2..5961cd7 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
@@ -41,6 +41,7 @@ import org.apache.carbondata.core.scan.model.QueryModel;
 import org.apache.carbondata.core.scan.model.QueryModelBuilder;
 import org.apache.carbondata.core.scan.result.RowBatch;
 import org.apache.carbondata.core.scan.result.iterator.RawResultIterator;
+import org.apache.carbondata.core.scan.wrappers.IntArrayWrapper;
 import org.apache.carbondata.core.stats.QueryStatistic;
 import org.apache.carbondata.core.stats.QueryStatisticsConstants;
 import org.apache.carbondata.core.stats.QueryStatisticsRecorder;
@@ -117,7 +118,7 @@ public class CarbonCompactionExecutor {
     resultList.put(CarbonCompactionUtil.SORTED_IDX,
         new ArrayList<RawResultIterator>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE));
 
-    List<TableBlockInfo> list = null;
+    List<TableBlockInfo> tableBlockInfos = null;
     QueryModelBuilder builder = new QueryModelBuilder(carbonTable)
         .projectAllColumns()
         .dataConverter(dataTypeConverter)
@@ -130,7 +131,6 @@ public class CarbonCompactionExecutor {
     for (Map.Entry<String, TaskBlockInfo> taskMap : segmentMapping.entrySet()) {
       String segmentId = taskMap.getKey();
       List<DataFileFooter> listMetadata = dataFileMetadataSegMapping.get(segmentId);
-      SegmentProperties sourceSegProperties = getSourceSegmentProperties(listMetadata);
       // for each segment get taskblock info
       TaskBlockInfo taskBlockInfo = taskMap.getValue();
       Set<String> taskBlockListMapping = taskBlockInfo.getTaskSet();
@@ -139,26 +139,71 @@ public class CarbonCompactionExecutor {
           CarbonCompactionUtil.isRestructured(listMetadata, carbonTable.getTableLastUpdatedTime())
               || !CarbonCompactionUtil.isSorted(listMetadata.get(0));
       for (String task : taskBlockListMapping) {
-        list = taskBlockInfo.getTableBlockInfoList(task);
-        Collections.sort(list);
-        LOGGER.info(
-            "for task -" + task + "- in segment id -" + segmentId + "- block size is -" + list
-                .size());
-        queryModel.setTableBlockInfos(list);
-        if (sortingRequired) {
-          resultList.get(CarbonCompactionUtil.UNSORTED_IDX).add(
-              new RawResultIterator(executeBlockList(list, segmentId, task, configuration),
-                  sourceSegProperties, destinationSegProperties, false));
-        } else {
-          resultList.get(CarbonCompactionUtil.SORTED_IDX).add(
-              new RawResultIterator(executeBlockList(list, segmentId, task, configuration),
-                  sourceSegProperties, destinationSegProperties, false));
+        tableBlockInfos = taskBlockInfo.getTableBlockInfoList(task);
+        // during update there may be a chance that the cardinality may change within the segment
+        // which may lead to failure while converting the row, so get all the blocks present in a
+        // task and then split into multiple lists of same column values and create separate
+        // RawResultIterator for each tableBlockInfo of same column values. If all the blocks have
+        // same column values, then make a single RawResultIterator for all the blocks
+        List<List<TableBlockInfo>> listOfTableBlocksBasedOnKeyLength =
+            getListOfTableBlocksBasedOnColumnValueSize(tableBlockInfos);
+        for (List<TableBlockInfo> tableBlockInfoList : listOfTableBlocksBasedOnKeyLength) {
+          Collections.sort(tableBlockInfoList);
+          LOGGER.info("for task -" + task + "- in segment id -" + segmentId + "- block size is -"
+              + tableBlockInfos.size());
+          queryModel.setTableBlockInfos(tableBlockInfoList);
+          if (sortingRequired) {
+            resultList.get(CarbonCompactionUtil.UNSORTED_IDX).add(
+                getRawResultIterator(configuration, segmentId, task, tableBlockInfoList));
+          } else {
+            resultList.get(CarbonCompactionUtil.SORTED_IDX).add(
+                getRawResultIterator(configuration, segmentId, task, tableBlockInfoList));
+          }
         }
       }
     }
     return resultList;
   }
 
+  private RawResultIterator getRawResultIterator(Configuration configuration, String segmentId,
+      String task, List<TableBlockInfo> tableBlockInfoList)
+      throws QueryExecutionException, IOException {
+    return new RawResultIterator(
+        executeBlockList(tableBlockInfoList, segmentId, task, configuration),
+        getSourceSegmentProperties(
+            Collections.singletonList(tableBlockInfoList.get(0).getDataFileFooter())),
+        destinationSegProperties, false);
+  }
+
+  /**
+   * This method returns the List of TableBlockInfoList, where each listOfTableBlockInfos will have
+   * same columnvalues
+   * @param tableBlockInfos List of tableBlockInfos present in each task
+   */
+  private List<List<TableBlockInfo>> getListOfTableBlocksBasedOnColumnValueSize(
+      List<TableBlockInfo> tableBlockInfos) {
+    List<List<TableBlockInfo>> listOfTableBlockInfoListOnColumnvaluesSize = new ArrayList<>();
+    Map<IntArrayWrapper, List<TableBlockInfo>> columnvalueSizeToTableBlockInfoMap = new HashMap<>();
+    for (TableBlockInfo tableBlock : tableBlockInfos) {
+      // get the columnValueSize for the dataFileFooter
+      IntArrayWrapper columnValueSize = new IntArrayWrapper(
+          getSourceSegmentProperties(Collections.singletonList(tableBlock.getDataFileFooter()))
+              .getColumnsValueSize());
+      List<TableBlockInfo> tempBlockInfoList =
+          columnvalueSizeToTableBlockInfoMap.get(columnValueSize);
+      if (tempBlockInfoList == null) {
+        tempBlockInfoList = new ArrayList<>();
+        columnvalueSizeToTableBlockInfoMap.put(columnValueSize, tempBlockInfoList);
+      }
+      tempBlockInfoList.add(tableBlock);
+    }
+    for (Map.Entry<IntArrayWrapper, List<TableBlockInfo>> taskMap :
+        columnvalueSizeToTableBlockInfoMap.entrySet()) {
+      listOfTableBlockInfoListOnColumnvaluesSize.add(taskMap.getValue());
+    }
+    return listOfTableBlockInfoListOnColumnvaluesSize;
+  }
+
   /**
    * This method will create the source segment properties based on restructured block existence
    *
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
index 1bf30b5..ffcfe0c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
@@ -144,10 +144,10 @@ public class CarbonCompactionUtil {
         if (null == dataFileMatadata.isSorted()) {
           dataFileMatadata.setSorted(isSortedTable);
         }
-        blockInfo.setDataFileFooter(dataFileMatadata);
       } else {
         dataFileMatadata = CarbonUtil.readMetadataFile(blockInfo);
       }
+      blockInfo.setDataFileFooter(dataFileMatadata);
       if (null == metadataList) {
         // if it is not present
         eachSegmentBlocks.add(dataFileMatadata);


[carbondata] 07/27: [CARBONDATA-3263] Update doc for RANGE_COLUMN

Posted by ra...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/carbondata.git

commit 045a614b151c0a1279f955de32998e753044fce8
Author: QiangCai <qi...@qq.com>
AuthorDate: Tue Jan 22 11:27:04 2019 +0800

    [CARBONDATA-3263] Update doc for RANGE_COLUMN
    
    Added documentation for range_column feature support
    
    This closes #3093
---
 docs/ddl-of-carbondata.md | 12 +++++++++++-
 docs/dml-of-carbondata.md | 23 ++++++++++++++++++++---
 2 files changed, 31 insertions(+), 4 deletions(-)

diff --git a/docs/ddl-of-carbondata.md b/docs/ddl-of-carbondata.md
index a1b0ce7..4f9e47b 100644
--- a/docs/ddl-of-carbondata.md
+++ b/docs/ddl-of-carbondata.md
@@ -34,7 +34,8 @@ CarbonData DDL statements are documented here,which includes:
   * [Extra Long String columns](#string-longer-than-32000-characters)
   * [Compression for Table](#compression-for-table)
   * [Bad Records Path](#bad-records-path) 
-  * [Load Minimum Input File Size](#load-minimum-data-size) 
+  * [Load Minimum Input File Size](#load-minimum-data-size)
+  * [Range Column](#range-column)
 
 * [CREATE TABLE AS SELECT](#create-table-as-select)
 * [CREATE EXTERNAL TABLE](#create-external-table)
@@ -109,6 +110,7 @@ CarbonData DDL statements are documented here,which includes:
 | [BUCKETNUMBER](#bucketing)                                   | Number of buckets to be created                              |
 | [BUCKETCOLUMNS](#bucketing)                                  | Columns which are to be placed in buckets                    |
 | [LOAD_MIN_SIZE_INMB](#load-minimum-data-size)                | Minimum input data size per node for data loading          |
+| [Range Column](#range-column)                                | partition input data by range                              |
 
  Following are the guidelines for TBLPROPERTIES, CarbonData's additional table options can be set via carbon.properties.
 
@@ -495,6 +497,14 @@ CarbonData DDL statements are documented here,which includes:
      TBLPROPERTIES('LOAD_MIN_SIZE_INMB'='256')
      ```
 
+   - ##### Range Column
+     This property is used to specify a column to partition the input data by range.
+     Only one column can be configured. During data loading, you can use "global_sort_partitions" or "scale_factor" to avoid generating small files.
+
+     ```
+     TBLPROPERTIES('RANGE_COLUMN'='col1')
+     ```
+
 ## CREATE TABLE AS SELECT
   This function allows user to create a Carbon table from any of the Parquet/Hive/Carbon table. This is beneficial when the user wants to create Carbon table from any other Parquet/Hive table and use the Carbon query engine to query and achieve better query results for cases where Carbon is faster than other file formats. Also this feature can be used for backing up the data.
 
diff --git a/docs/dml-of-carbondata.md b/docs/dml-of-carbondata.md
index d6e5932..b3fe517 100644
--- a/docs/dml-of-carbondata.md
+++ b/docs/dml-of-carbondata.md
@@ -66,7 +66,8 @@ CarbonData DML statements are documented here,which includes:
 | [BAD_RECORDS_ACTION](#bad-records-handling)             | Behavior of data loading when bad record is found            |
 | [IS_EMPTY_DATA_BAD_RECORD](#bad-records-handling)       | Whether empty data of a column to be considered as bad record or not |
 | [GLOBAL_SORT_PARTITIONS](#global_sort_partitions)       | Number of partition to use for shuffling of data during sorting |
-
+| [SCALE_FACTOR](#scale_factor)                           | Control the partition size for RANGE_COLUMN feature          |
+-
   You can use the following options to load data:
 
   - ##### DELIMITER: 
@@ -268,15 +269,31 @@ CarbonData DML statements are documented here,which includes:
   - ##### GLOBAL_SORT_PARTITIONS:
 
     If the SORT_SCOPE is defined as GLOBAL_SORT, then user can specify the number of partitions to use while shuffling data for sort using GLOBAL_SORT_PARTITIONS. If it is not configured, or configured less than 1, then it uses the number of map task as reduce task. It is recommended that each reduce task deal with 512MB-1GB data.
-
+    For RANGE_COLUMN, GLOBAL_SORT_PARTITIONS is used to specify the number of range partitions also.
   ```
   OPTIONS('GLOBAL_SORT_PARTITIONS'='2')
   ```
 
-   NOTE:
+   **NOTE:**
    * GLOBAL_SORT_PARTITIONS should be Integer type, the range is [1,Integer.MaxValue].
    * It is only used when the SORT_SCOPE is GLOBAL_SORT.
 
+   - ##### SCALE_FACTOR
+
+   For RANGE_COLUMN, SCALE_FACTOR is used to control the number of range partitions as following.
+   ```
+     splitSize = max(blocklet_size, (block_size - blocklet_size)) * scale_factor
+     numPartitions = total size of input data / splitSize
+   ```
+   The default value is 3, and the range is [1, 300].
+
+   ```
+     OPTIONS('SCALE_FACTOR'='10')
+   ```
+   **NOTE:**
+   * If both GLOBAL_SORT_PARTITIONS and SCALE_FACTOR are used at the same time, only GLOBAL_SORT_PARTITIONS is valid.
+   * The compaction on RANGE_COLUMN will use LOCAL_SORT by default.
+
 ### INSERT DATA INTO CARBONDATA TABLE
 
   This command inserts data into a CarbonData table, it is defined as a combination of two queries Insert and Select query respectively. 


[carbondata] 03/27: [CARBONDATA-3243] Updated DOC for No-Sort Compaction and a few Fixes

Posted by ra...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/carbondata.git

commit da235f288a95182f405cdc22499d33ea671757d8
Author: namanrastogi <na...@gmail.com>
AuthorDate: Thu Jan 10 14:40:23 2019 +0530

    [CARBONDATA-3243] Updated DOC for No-Sort Compaction and a few Fixes
    
    Updated Doc
    Checking SORT_SCOPE in session property CARBON.TABLE.LOAD.SORT.SCOPE in CarbonTable.getSortScope()
    Changed default Sort Scope in SortScopeOptions.getSortScope()
    Validation for Load Option SORT_SCOPE
    Add the iterator in priority queue only of record is found in iterator, else not.
    
    This closes #3064
---
 .../carbondata/core/constants/SortScopeOptions.java  |  2 +-
 .../core/metadata/schema/table/CarbonTable.java      | 20 ++++++++++++++------
 docs/configuration-parameters.md                     |  1 +
 docs/dml-of-carbondata.md                            |  9 ++++++++-
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala      | 11 +++++++++++
 .../sortdata/SingleThreadFinalSortFilesMerger.java   |  2 +-
 6 files changed, 36 insertions(+), 9 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/constants/SortScopeOptions.java b/core/src/main/java/org/apache/carbondata/core/constants/SortScopeOptions.java
index 9225bb4..fe7b4e9 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/SortScopeOptions.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/SortScopeOptions.java
@@ -36,7 +36,7 @@ public class SortScopeOptions {
       case "NO_SORT":
         return SortScope.NO_SORT;
       default:
-        return SortScope.LOCAL_SORT;
+        return getSortScope(CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT);
     }
   }
 
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index f89dd6c..c4adab4 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -1357,12 +1357,20 @@ public class CarbonTable implements Serializable {
       if (getNumberOfSortColumns() == 0) {
         return SortScopeOptions.SortScope.NO_SORT;
       } else {
-        return SortScopeOptions.getSortScope(
-            CarbonProperties.getInstance().getProperty(
-                CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
-                CarbonProperties.getInstance().getProperty(
-                    CarbonCommonConstants.LOAD_SORT_SCOPE,
-                    CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)));
+        // Check SORT_SCOPE in Session Properties first.
+        String sortScopeSessionProp = CarbonProperties.getInstance().getProperty(
+            CarbonLoadOptionConstants.CARBON_TABLE_LOAD_SORT_SCOPE + getDatabaseName() + "."
+                + getTableName());
+        if (null != sortScopeSessionProp) {
+          return SortScopeOptions.getSortScope(sortScopeSessionProp);
+        }
+
+        // If SORT_SCOPE is not found in Session Properties,
+        // then retrieve it from Table.
+        return SortScopeOptions.getSortScope(CarbonProperties.getInstance()
+            .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
+                CarbonProperties.getInstance()
+                    .getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, "LOCAL_SORT")));
       }
     } else {
       return SortScopeOptions.getSortScope(sortScope);
diff --git a/docs/configuration-parameters.md b/docs/configuration-parameters.md
index 105b768..c7d8152 100644
--- a/docs/configuration-parameters.md
+++ b/docs/configuration-parameters.md
@@ -208,6 +208,7 @@ RESET
 | carbon.options.date.format                 | Specifies the data format of the date columns in the data being loaded |
 | carbon.options.timestamp.format            | Specifies the timestamp format of the time stamp columns in the data being loaded |
 | carbon.options.sort.scope                 | Specifies how the current data load should be sorted with. **NOTE:** Refer to [Data Loading Configuration](#data-loading-configuration)#carbon.sort.scope for detailed information. |
+| carbon.table.load.sort.scope              | Overrides the SORT_SCOPE provided in CREATE TABLE.           |
 | carbon.options.global.sort.partitions     |                                                              |
 | carbon.options.serialization.null.format  | Default Null value representation in the data being loaded. **NOTE:** Refer to [Data Loading Configuration](#data-loading-configuration)#carbon.options.serialization.null.format for detailed information. |
 | carbon.query.directQueryOnDataMap.enabled | Specifies whether datamap can be queried directly. This is useful for debugging purposes.**NOTE: **Refer to [Query Configuration](#query-configuration) for detailed information. |
diff --git a/docs/dml-of-carbondata.md b/docs/dml-of-carbondata.md
index c8d72ef..d6e5932 100644
--- a/docs/dml-of-carbondata.md
+++ b/docs/dml-of-carbondata.md
@@ -49,6 +49,7 @@ CarbonData DML statements are documented here,which includes:
 | [COMMENTCHAR](#commentchar)                             | Character used to comment the rows in the input csv file. Those rows will be skipped from processing |
 | [HEADER](#header)                                       | Whether the input csv files have header row                  |
 | [FILEHEADER](#fileheader)                               | If header is not present in the input csv, what is the column names to be used for data read from input csv |
+| [SORT_SCOPE](#sort_scope)                               | Sort Scope to be used for current load.                      |
 | [MULTILINE](#multiline)                                 | Whether a row data can span across multiple lines.           |
 | [ESCAPECHAR](#escapechar)                               | Escape character used to excape the data in input csv file.For eg.,\ is a standard escape character |
 | [SKIP_EMPTY_LINE](#skip_empty_line)                     | Whether empty lines in input csv file should be skipped or loaded as null row |
@@ -106,6 +107,13 @@ CarbonData DML statements are documented here,which includes:
     OPTIONS('FILEHEADER'='column1,column2') 
     ```
 
+  - ##### SORT_SCOPE:
+  Sort Scope to be used for the current load. This overrides the Sort Scope of Table.
+
+  ```
+  OPTIONS('SORT_SCOPE'='BATCH_SORT')
+  ```
+
   - ##### MULTILINE:
 
     CSV with new line character in quotes.
@@ -467,4 +475,3 @@ CarbonData DML statements are documented here,which includes:
   ```
   CLEAN FILES FOR TABLE carbon_table
   ```
-
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index 523d59c..dc75243 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -35,6 +35,7 @@ import org.apache.carbondata.common.constants.LoggerAction
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.exception.InvalidConfigurationException
 import org.apache.carbondata.core.metadata.datatype.DataTypes
 import org.apache.carbondata.core.metadata.schema.PartitionInfo
 import org.apache.carbondata.core.metadata.schema.partition.PartitionType
@@ -1217,6 +1218,16 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
         }
       }
 
+    // Validate SORT_SCOPE
+    if (options.exists(_._1.equalsIgnoreCase("SORT_SCOPE"))) {
+      val optionValue: String = options.get("sort_scope").get.head._2
+      if (!CarbonUtil.isValidSortOption(optionValue)) {
+        throw new InvalidConfigurationException(
+          s"Passing invalid SORT_SCOPE '$optionValue', valid SORT_SCOPE are 'NO_SORT'," +
+          s" 'BATCH_SORT', 'LOCAL_SORT' and 'GLOBAL_SORT' ")
+      }
+    }
+
     // check for duplicate options
     val duplicateOptions = options filter {
       case (_, optionList) => optionList.size > 1
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java
index d243749..bd9526f 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java
@@ -135,8 +135,8 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> {
               noDicAndComplexColumns, sortParameters, measureDataType);
       if (inMemorySortTempChunkHolder.hasNext()) {
         inMemorySortTempChunkHolder.readRow();
+        recordHolderHeapLocal.add(inMemorySortTempChunkHolder);
       }
-      recordHolderHeapLocal.add(inMemorySortTempChunkHolder);
     }
   }
 


[carbondata] 14/27: [CARBONDATA-3232] Add example and doc for alluxio integration

Posted by ra...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/carbondata.git

commit d288c4f704364b1ad382c190d08d615e2b5a21d5
Author: xubo245 <xu...@huawei.com>
AuthorDate: Mon Jan 7 20:27:37 2019 +0800

    [CARBONDATA-3232] Add example and doc for alluxio integration
    
    Optimize carbonData usage with alluxio:
    1.Add doc
    2.optimize the example
    
    This closes #3054
---
 README.md                                          |   1 +
 docs/alluxio-guide.md                              | 136 +++++++++++++++++++++
 docs/documentation.md                              |   6 +-
 docs/introduction.md                               |   4 +-
 docs/quick-start-guide.md                          |  17 ++-
 examples/spark2/pom.xml                            |  10 ++
 .../carbondata/examples/AlluxioExample.scala       | 115 ++++++++++++-----
 .../carbondata/examples/util/ExampleUtils.scala    |  13 +-
 8 files changed, 264 insertions(+), 38 deletions(-)

diff --git a/README.md b/README.md
index a788cea..bed906f 100644
--- a/README.md
+++ b/README.md
@@ -70,6 +70,7 @@ CarbonData is built using Apache Maven, to [build CarbonData](https://github.com
 ##  Integration
 * [Hive](https://github.com/apache/carbondata/blob/master/docs/hive-guide.md)
 * [Presto](https://github.com/apache/carbondata/blob/master/docs/presto-guide.md)
+* [Alluxio](https://github.com/apache/carbondata/blob/master/docs/alluxio-guide.md)
 
 ## Other Technical Material
 * [Apache CarbonData meetup material](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=66850609)
diff --git a/docs/alluxio-guide.md b/docs/alluxio-guide.md
new file mode 100644
index 0000000..b1bfeeb
--- /dev/null
+++ b/docs/alluxio-guide.md
@@ -0,0 +1,136 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more 
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership. 
+    The ASF licenses this file to you under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with 
+    the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software 
+    distributed under the License is distributed on an "AS IS" BASIS, 
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and 
+    limitations under the License.
+-->
+
+
+# Alluxio guide
+This tutorial provides a brief introduction to using Alluxio.
+ - How to use Alluxio in CarbonData?
+    - [Running alluxio example in CarbonData project by IDEA](#Running alluxio example in CarbonData project by IDEA)
+    - [CarbonData supports alluxio by spark-shell](#CarbonData supports alluxio by spark-shell)
+    - [CarbonData supports alluxio by spark-submit](#CarbonData supports alluxio by spark-submit)
+     
+## Running alluxio example in CarbonData project by IDEA
+
+### [Building CarbonData](https://github.com/apache/carbondata/tree/master/build)
+ - Please refer to [Building CarbonData](https://github.com/apache/carbondata/tree/master/build).   
+ - Users need to install IDEA and scala plugin, and import CarbonData project.
+ 
+### Installing and starting Alluxio
+ - Please refer to [https://www.alluxio.org/docs/1.8/en/Getting-Started.html#starting-alluxio](https://www.alluxio.org/docs/1.8/en/Getting-Started.html#starting-alluxio)   
+ - Access the Alluxio web: [http://localhost:19999/home](http://localhost:19999/home)   
+
+### Running Example
+ - Please refer to [AlluxioExample](https://github.com/apache/carbondata/blob/master/examples/spark2/src/main/scala/org/apache/carbondata/examples/AlluxioExample.scala)
+
+## CarbonData supports alluxio by spark-shell
+
+### [Building CarbonData](https://github.com/apache/carbondata/tree/master/build)
+ - Please refer to [Building CarbonData](https://github.com/apache/carbondata/tree/master/build).   
+
+### Preparing Spark
+ - Please refer to [http://spark.apache.org/docs/latest/](http://spark.apache.org/docs/latest/)
+
+### Downloading alluxio and uncompressing it
+ - Please refer to [https://www.alluxio.org/download](https://www.alluxio.org/download)
+ 
+### Running spark-shell
+ - Running the command in spark path
+ ```$command
+./bin/spark-shell --jars ${CARBONDATA_PATH}/assembly/target/scala-2.11/apache-carbondata-1.6.0-SNAPSHOT-bin-spark2.2.1-hadoop2.7.2.jar,${ALLUXIO_PATH}/client/alluxio-1.8.1-client.jar
+```
+ - Testing use alluxio by CarbonSession
+ ```$scala
+import org.apache.spark.sql.CarbonSession._
+import org.apache.spark.sql.SparkSession
+	
+val carbon = SparkSession.builder().master("local").appName("test").getOrCreateCarbonSession("alluxio://localhost:19998/carbondata");
+carbon.sql("CREATE TABLE carbon_alluxio(id String,name String, city String,age Int) STORED as carbondata");
+carbon.sql(s"LOAD DATA LOCAL INPATH '${CARBONDATA_PATH}/integration/spark-common-test/src/test/resources/sample.csv' into table carbon_alluxio");
+carbon.sql("select * from carbon_alluxio").show
+```
+ - Result
+ ```$scala
+ scala> carbon.sql("select * from carbon_alluxio").show
+ +---+------+---------+---+
+ | id|  name|     city|age|
+ +---+------+---------+---+
+ |  1| david| shenzhen| 31|
+ |  2| eason| shenzhen| 27|
+ |  3| jarry|    wuhan| 35|
+ |  3| jarry|Bangalore| 35|
+ |  4| kunal|    Delhi| 26|
+ |  4|vishal|Bangalore| 29|
+ +---+------+---------+---+
+ ```
+## CarbonData supports alluxio by spark-submit
+
+### [Building CarbonData](https://github.com/apache/carbondata/tree/master/build)
+ - Please refer to [Building CarbonData](https://github.com/apache/carbondata/tree/master/build).   
+
+### Preparing Spark
+ - Please refer to [http://spark.apache.org/docs/latest/](http://spark.apache.org/docs/latest/)
+
+### Downloading alluxio and uncompressing it
+ - Please refer to [https://www.alluxio.org/download](https://www.alluxio.org/download)
+ 
+### Running spark-submit
+#### Upload data to alluxio
+```$command
+./bin/alluxio fs  copyFromLocal ${CARBONDATA_PATH}/hadoop/src/test/resources/data.csv /
+```
+#### Command
+```$command
+./bin/spark-submit \
+--master local \
+--jars ${ALLUXIO_PATH}/client/alluxio-1.8.1-client.jar,${CARBONDATA_PATH}/examples/spark2/target/carbondata-examples-1.6.0-SNAPSHOT.jar \
+--class org.apache.carbondata.examples.AlluxioExample \
+${CARBONDATA_PATH}/assembly/target/scala-2.11/apache-carbondata-1.6.0-SNAPSHOT-bin-spark2.2.1-hadoop2.7.2.jar \
+false
+```
+**NOTE**: Please set runShell as false, which can avoid dependency on alluxio shell module.
+
+#### Result
+```$command
++-----------------+-------+--------------------+--------------------+---------+-----------+---------+----------+
+|SegmentSequenceId| Status|     Load Start Time|       Load End Time|Merged To|File Format|Data Size|Index Size|
++-----------------+-------+--------------------+--------------------+---------+-----------+---------+----------+
+|                1|Success|2019-01-09 15:10:...|2019-01-09 15:10:...|       NA|COLUMNAR_V3|  23.92KB|    1.07KB|
+|                0|Success|2019-01-09 15:10:...|2019-01-09 15:10:...|       NA|COLUMNAR_V3|  23.92KB|    1.07KB|
++-----------------+-------+--------------------+--------------------+---------+-----------+---------+----------+
+
++-------+------+
+|country|amount|
++-------+------+
+| france|   202|
+|  china|  1698|
++-------+------+
+
++-----------------+---------+--------------------+--------------------+---------+-----------+---------+----------+
+|SegmentSequenceId|   Status|     Load Start Time|       Load End Time|Merged To|File Format|Data Size|Index Size|
++-----------------+---------+--------------------+--------------------+---------+-----------+---------+----------+
+|                3|Compacted|2019-01-09 15:10:...|2019-01-09 15:10:...|      0.1|COLUMNAR_V3|  23.92KB|    1.03KB|
+|                2|Compacted|2019-01-09 15:10:...|2019-01-09 15:10:...|      0.1|COLUMNAR_V3|  23.92KB|    1.07KB|
+|                1|Compacted|2019-01-09 15:10:...|2019-01-09 15:10:...|      0.1|COLUMNAR_V3|  23.92KB|    1.07KB|
+|              0.1|  Success|2019-01-09 15:10:...|2019-01-09 15:10:...|       NA|COLUMNAR_V3|  37.65KB|    1.08KB|
+|                0|Compacted|2019-01-09 15:10:...|2019-01-09 15:10:...|      0.1|COLUMNAR_V3|  23.92KB|    1.07KB|
++-----------------+---------+--------------------+--------------------+---------+-----------+---------+----------+
+
+```
+
+## Reference
+[1] https://www.alluxio.org/docs/1.8/en/Getting-Started.html
+[2] https://www.alluxio.org/docs/1.8/en/compute/Spark.html
\ No newline at end of file
diff --git a/docs/documentation.md b/docs/documentation.md
index d1261a1..a40eed1 100644
--- a/docs/documentation.md
+++ b/docs/documentation.md
@@ -29,7 +29,7 @@ Apache CarbonData is a new big data file format for faster interactive query usi
 
 **Quick Start:** [Run an example program](./quick-start-guide.md#installing-and-configuring-carbondata-to-run-locally-with-spark-shell) on your local machine or [study some examples](https://github.com/apache/carbondata/tree/master/examples/spark2/src/main/scala/org/apache/carbondata/examples).
 
-**CarbonData SQL Language Reference:** CarbonData extends the Spark SQL language and adds several [DDL](./ddl-of-carbondata.md) and [DML](./dml-of-carbondata.md) statements to support operations on it.Refer to the [Reference Manual](./language-manual.md) to understand the supported features and functions.
+**CarbonData SQL Language Reference:** CarbonData extends the Spark SQL language and adds several [DDL](./ddl-of-carbondata.md) and [DML](./dml-of-carbondata.md) statements to support operations on it. Refer to the [Reference Manual](./language-manual.md) to understand the supported features and functions.
 
 **Programming Guides:** You can read our guides about [Java APIs supported](./sdk-guide.md) or [C++ APIs supported](./csdk-guide.md) to learn how to integrate CarbonData with your applications.
 
@@ -37,7 +37,9 @@ Apache CarbonData is a new big data file format for faster interactive query usi
 
 ## Integration
 
-CarbonData can be integrated with popular Execution engines like [Spark](./quick-start-guide.md#spark) , [Presto](./quick-start-guide.md#presto) and [Hive](./quick-start-guide.md#hive).Refer to the [Installation and Configuration](./quick-start-guide.md#integration) section to understand all modes of Integrating CarbonData.
+ - CarbonData can be integrated with popular execution engines like [Spark](./quick-start-guide.md#spark) , [Presto](./quick-start-guide.md#presto) and [Hive](./quick-start-guide.md#hive).
+ - CarbonData can be integrated with popular storage engines like HDFS, Huawei Cloud(OBS) and [Alluxio](./quick-start-guide.md#alluxio).   
+  Refer to the [Installation and Configuration](./quick-start-guide.md#integration) section to understand all modes of Integrating CarbonData.
 
 
 
diff --git a/docs/introduction.md b/docs/introduction.md
index 2ab6dd4..037a296 100644
--- a/docs/introduction.md
+++ b/docs/introduction.md
@@ -115,8 +115,10 @@ CarbonData has rich set of features to support various use cases in Big Data ana
 
 - ##### HDFS
 
-  CarbonData uses HDFS api to write and read data from HDFS.CarbonData can take advantage of the locality information to efficiently suggest spark to run tasks near to the data.
+  CarbonData uses HDFS api to write and read data from HDFS. CarbonData can take advantage of the locality information to efficiently suggest spark to run tasks near to the data.
 
+- ##### Alluxio
+  CarbonData also supports read and write with [Alluxio](./quick-start-guide.md#alluxio). 
 
 
 ## Integration with Big Data ecosystem
diff --git a/docs/quick-start-guide.md b/docs/quick-start-guide.md
index b7b20a8..244a9ae 100644
--- a/docs/quick-start-guide.md
+++ b/docs/quick-start-guide.md
@@ -35,9 +35,10 @@ This tutorial provides a quick introduction to using CarbonData. To follow along
 
 ## Integration
 
-CarbonData can be integrated with Spark,Presto and Hive Execution Engines. The below documentation guides on Installing and Configuring with these execution engines.
+### Integration with Execution Engines
+CarbonData can be integrated with Spark,Presto and Hive execution engines. The below documentation guides on Installing and Configuring with these execution engines.
 
-### Spark
+#### Spark
 
 [Installing and Configuring CarbonData to run locally with Spark Shell](#installing-and-configuring-carbondata-to-run-locally-with-spark-shell)
 
@@ -48,13 +49,21 @@ CarbonData can be integrated with Spark,Presto and Hive Execution Engines. The b
 [Installing and Configuring CarbonData Thrift Server for Query Execution](#query-execution-using-carbondata-thrift-server)
 
 
-### Presto
+#### Presto
 [Installing and Configuring CarbonData on Presto](#installing-and-configuring-carbondata-on-presto)
 
-### Hive
+#### Hive
 [Installing and Configuring CarbonData on Hive](https://github.com/apache/carbondata/blob/master/docs/hive-guide.md)
 
+### Integration with Storage Engines
+#### HDFS
+[CarbonData supports read and write with HDFS](https://github.com/apache/carbondata/blob/master/docs/quick-start-guide.md#installing-and-configuring-carbondata-on-standalone-spark-cluster)
 
+#### S3
+[CarbonData supports read and write with S3](https://github.com/apache/carbondata/blob/master/docs/s3-guide.md) 
+
+#### Alluxio
+[CarbonData supports read and write with Alluxio](https://github.com/apache/carbondata/blob/master/docs/alluxio-guide.md)
 
 ## Installing and Configuring CarbonData to run locally with Spark Shell
 
diff --git a/examples/spark2/pom.xml b/examples/spark2/pom.xml
index b47b0af..e233226 100644
--- a/examples/spark2/pom.xml
+++ b/examples/spark2/pom.xml
@@ -105,6 +105,16 @@
       <artifactId>carbondata-core</artifactId>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.alluxio</groupId>
+      <artifactId>alluxio-core-client-hdfs</artifactId>
+      <version>1.8.1</version>
+    </dependency>
+    <dependency>
+      <groupId>org.alluxio</groupId>
+      <artifactId>alluxio-shell</artifactId>
+      <version>1.8.1</version>
+    </dependency>
   </dependencies>
 
   <build>
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/AlluxioExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/AlluxioExample.scala
index 31110ce..f757bee 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/AlluxioExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/AlluxioExample.scala
@@ -17,6 +17,11 @@
 
 package org.apache.carbondata.examples
 
+import java.io.File
+import java.text.SimpleDateFormat
+import java.util.Date
+
+import alluxio.cli.fs.FileSystemShell
 import org.apache.spark.sql.SparkSession
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -24,50 +29,104 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.examples.util.ExampleUtils
 
-
 /**
  * configure alluxio:
  * 1.start alluxio
- * 2.upload the jar :"/alluxio_path/core/client/target/
- * alluxio-core-client-YOUR-VERSION-jar-with-dependencies.jar"
- * 3.Get more detail at:http://www.alluxio.org/docs/master/en/Running-Spark-on-Alluxio.html
+ * 2. Please upload data to alluxio if you set runShell as false
+ * ./bin/alluxio fs copyFromLocal /carbondata_path/hadoop/src/test/resources/data.csv /
+ * 3.Get more details at: https://www.alluxio.org/docs/1.8/en/compute/Spark.html
  */
-
 object AlluxioExample {
-  def main(args: Array[String]) {
-    val spark = ExampleUtils.createCarbonSession("AlluxioExample")
-    exampleBody(spark)
-    spark.close()
+  def main (args: Array[String]) {
+    val carbon = ExampleUtils.createCarbonSession("AlluxioExample",
+      storePath = "alluxio://localhost:19998/carbondata")
+    val runShell: Boolean = if (null != args && args.length > 0) {
+      args(0).toBoolean
+    } else {
+      true
+    }
+    exampleBody(carbon, runShell)
+    carbon.close()
   }
 
-  def exampleBody(spark : SparkSession): Unit = {
+  def exampleBody (spark: SparkSession, runShell: Boolean = true): Unit = {
+    val rootPath = new File(this.getClass.getResource("/").getPath
+      + "../../../..").getCanonicalPath
     spark.sparkContext.hadoopConfiguration.set("fs.alluxio.impl", "alluxio.hadoop.FileSystem")
     FileFactory.getConfiguration.set("fs.alluxio.impl", "alluxio.hadoop.FileSystem")
 
     // Specify date format based on raw data
     CarbonProperties.getInstance()
-      .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "yyyy/MM/dd")
+            .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "yyyy/MM/dd")
+    val time = new SimpleDateFormat("yyyyMMddHHmmssSSS").format(new Date())
+    val alluxioPath = "alluxio://localhost:19998"
+    var alluxioFile = alluxioPath + "/data.csv"
+
+    val remoteFile = "/carbon_alluxio" + time + ".csv"
+
+    var mFsShell: FileSystemShell = null
+
+    // avoid dependency alluxio shell when running it with spark-submit
+    if (runShell) {
+      mFsShell = new FileSystemShell()
+      alluxioFile = alluxioPath + remoteFile
+      val localFile = rootPath + "/hadoop/src/test/resources/data.csv"
+      mFsShell.run("copyFromLocal", localFile, remoteFile)
+    }
+
+    import spark._
+
+    sql("DROP TABLE IF EXISTS alluxio_table")
 
-    spark.sql("DROP TABLE IF EXISTS alluxio_table")
+    sql(
+      s"""
+         | CREATE TABLE IF NOT EXISTS alluxio_table(
+         |    ID Int,
+         |    date Date,
+         |    country String,
+         |    name String,
+         |    phonetype String,
+         |    serialname String,
+         |    salary Int)
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES(
+         |    'SORT_COLUMNS' = 'phonetype,name',
+         |    'DICTIONARY_INCLUDE'='phonetype',
+         |    'TABLE_BLOCKSIZE'='32',
+         |    'AUTO_LOAD_MERGE'='true')
+       """.stripMargin)
 
-    spark.sql("""
-           CREATE TABLE IF NOT EXISTS alluxio_table
-           (ID Int, date Date, country String,
-           name String, phonetype String, serialname String, salary Int)
-           STORED BY 'carbondata'
-           """)
+    for (i <- 0 until 2) {
+      sql(
+        s"""
+           | LOAD DATA LOCAL INPATH '$alluxioFile'
+           | into table alluxio_table
+         """.stripMargin)
+    }
 
-    spark.sql(s"""
-           LOAD DATA LOCAL INPATH 'alluxio://localhost:19998/data.csv' into table alluxio_table
-           """)
+    sql("SELECT * FROM alluxio_table").show()
 
-    spark.sql("""
-           SELECT country, count(salary) AS amount
-           FROM alluxio_table
-           WHERE country IN ('china','france')
-           GROUP BY country
-           """).show()
+    sql("SHOW SEGMENTS FOR TABLE alluxio_table").show()
+    sql(
+      """
+        | SELECT country, count(salary) AS amount
+        | FROM alluxio_table
+        | WHERE country IN ('china','france')
+        | GROUP BY country
+      """.stripMargin).show()
 
-    spark.sql("DROP TABLE IF EXISTS alluxio_table")
+    for (i <- 0 until 2) {
+      sql(
+        s"""
+           | LOAD DATA LOCAL INPATH '$alluxioFile'
+           | into table alluxio_table
+         """.stripMargin)
+    }
+    sql("SHOW SEGMENTS FOR TABLE alluxio_table").show()
+    if (runShell) {
+      mFsShell.run("rm", remoteFile)
+      mFsShell.close()
+    }
+    sql("DROP TABLE IF EXISTS alluxio_table")
   }
 }
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/util/ExampleUtils.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/util/ExampleUtils.scala
index bb9f4d0..b6e3f4b 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/util/ExampleUtils.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/util/ExampleUtils.scala
@@ -30,13 +30,20 @@ object ExampleUtils {
       .getCanonicalPath
   val storeLocation: String = currentPath + "/target/store"
 
-  def createCarbonSession(appName: String, workThreadNum: Int = 1): SparkSession = {
+  def createCarbonSession (appName: String, workThreadNum: Int = 1,
+      storePath: String = null): SparkSession = {
     val rootPath = new File(this.getClass.getResource("/").getPath
-                            + "../../../..").getCanonicalPath
-    val storeLocation = s"$rootPath/examples/spark2/target/store"
+      + "../../../..").getCanonicalPath
+
     val warehouse = s"$rootPath/examples/spark2/target/warehouse"
     val metaStoreDB = s"$rootPath/examples/spark2/target"
 
+    val storeLocation = if (null != storePath) {
+      storePath
+    } else {
+      s"$rootPath/examples/spark2/target/store"
+    }
+
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd HH:mm:ss")
       .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "yyyy/MM/dd")


[carbondata] 08/27: [CARBONDATA-3257] Fix for NO_SORT load and describe formatted being in NO_SORT flow even with Sort Columns given

Posted by ra...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/carbondata.git

commit a58fc00aae7d5a25c2f151b2ad15f0e8b02b48e6
Author: manishnalla1994 <ma...@gmail.com>
AuthorDate: Mon Jan 21 17:23:37 2019 +0530

    [CARBONDATA-3257] Fix for NO_SORT load and describe formatted being in NO_SORT flow even with Sort Columns given
    
    Problem: Data Load is in No sort flow when version is upgraded even if sort columns are given. Also describe formatted displays wrong sort scope after refresh.
    
    Solution: Added a condition to check for the presence of Sort Columns.
    
    This closes #3083
---
 .../core/constants/CarbonCommonConstants.java      |  1 +
 .../sdv/generated/SetParameterTestCase.scala       |  8 +++---
 .../command/carbonTableSchemaCommon.scala          | 12 ---------
 .../command/management/CarbonLoadDataCommand.scala | 31 +++++++++++++++-------
 .../table/CarbonDescribeFormattedCommand.scala     | 18 ++++++++++---
 5 files changed, 42 insertions(+), 28 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index b7d9761..86bf5f1 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -426,6 +426,7 @@ public final class CarbonCommonConstants {
    */
   public static final String DICTIONARY_PATH = "dictionary_path";
   public static final String SORT_COLUMNS = "sort_columns";
+  public static final String SORT_SCOPE = "sort_scope";
   public static final String RANGE_COLUMN = "range_column";
   public static final String PARTITION_TYPE = "partition_type";
   public static final String NUM_PARTITIONS = "num_partitions";
diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/SetParameterTestCase.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/SetParameterTestCase.scala
index 8c336d8..54d9e3f 100644
--- a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/SetParameterTestCase.scala
+++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/SetParameterTestCase.scala
@@ -209,11 +209,11 @@ class SetParameterTestCase extends QueryTest with BeforeAndAfterAll {
     sql("SET carbon.options.sort.scope=local_sort")
     sql(
       "create table carbon_table(empno int, empname String, designation String, doj Timestamp," +
-      "workgroupcategory int) STORED BY 'org.apache.carbondata.format'")
-    checkExistence(sql("DESC FORMATTED carbon_table"), true, "LOCAL_SORT")
-    val sortscope=sql("DESC FORMATTED carbon_table").collect().filter(_.getString(1).trim.equals("LOCAL_SORT"))
+      "workgroupcategory int) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('SORT_COLUMNS'='empno,empname')")
+    checkExistence(sql("DESC FORMATTED carbon_table"), true, "local_sort")
+    val sortscope=sql("DESC FORMATTED carbon_table").collect().filter(_.getString(1).trim.equals("local_sort"))
     assertResult(1)(sortscope.length)
-    assertResult("LOCAL_SORT")(sortscope(0).getString(1).trim)
+    assertResult("local_sort")(sortscope(0).getString(1).trim)
   }
 
   test("TC_011-test SET property to Enable Unsafe Sort") {
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index 2ce9d89..b6b4e8d 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -854,18 +854,6 @@ class TableNewProcessor(cm: TableModel) {
       tableSchema.getTableId,
       cm.databaseNameOp.getOrElse("default"))
     tablePropertiesMap.put("bad_record_path", badRecordsPath)
-    if (tablePropertiesMap.get("sort_columns") != null) {
-      val sortCol = tablePropertiesMap.get("sort_columns")
-      if ((!sortCol.trim.isEmpty) && tablePropertiesMap.get("sort_scope") == null) {
-        // If sort_scope is not specified, but sort_columns are present, set sort_scope as
-        // local_sort in carbon_properties (cannot add in table properties as if user sets carbon
-        // properties it won't be reflected as table properties is given higher priority)
-        if (CarbonProperties.getInstance().getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE) ==
-            null) {
-          tablePropertiesMap.put("sort_scope", "LOCAL_SORT")
-        }
-      }
-    }
     tableSchema.setTableProperties(tablePropertiesMap)
     if (cm.bucketFields.isDefined) {
       val bucketCols = cm.bucketFields.get.bucketColumns.map { b =>
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 0030156..242a467 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -209,15 +209,28 @@ case class CarbonLoadDataCommand(
     * 4. Session property CARBON_OPTIONS_SORT_SCOPE
     * 5. Default Sort Scope LOAD_SORT_SCOPE
     */
-    optionsFinal.put("sort_scope",
-      options.getOrElse("sort_scope",
-        carbonProperty.getProperty(
-          CarbonLoadOptionConstants.CARBON_TABLE_LOAD_SORT_SCOPE + table.getDatabaseName + "." +
-          table.getTableName,
-          tableProperties.asScala.getOrElse("sort_scope",
-            carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
-              carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
-                CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))))))
+    if (tableProperties.get(CarbonCommonConstants.SORT_COLUMNS) != null &&
+        tableProperties.get(CarbonCommonConstants.SORT_SCOPE) == null) {
+      // If there are Sort Columns given for the table and Sort Scope is not specified,
+      // we will take it as whichever sort scope given or LOCAL_SORT as default
+      optionsFinal
+        .put(CarbonCommonConstants.SORT_SCOPE,
+          carbonProperty
+            .getProperty(
+              CarbonLoadOptionConstants.CARBON_TABLE_LOAD_SORT_SCOPE + table.getDatabaseName + "." +
+              table.getTableName, carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
+                SortScopeOptions.getSortScope("LOCAL_SORT").toString)))
+    } else {
+      optionsFinal.put(CarbonCommonConstants.SORT_SCOPE,
+        options.getOrElse(CarbonCommonConstants.SORT_SCOPE,
+          carbonProperty.getProperty(
+            CarbonLoadOptionConstants.CARBON_TABLE_LOAD_SORT_SCOPE + table.getDatabaseName + "." +
+            table.getTableName,
+            tableProperties.asScala.getOrElse(CarbonCommonConstants.SORT_SCOPE,
+              carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
+                carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
+                  CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))))))
+    }
 
     optionsFinal
       .put("bad_record_path", CarbonBadRecordUtil.getBadRecordsPath(options.asJava, table))
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
index 69db4e0..e541139 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
@@ -30,11 +30,11 @@ import org.apache.spark.sql.execution.command.MetadataCommand
 import org.apache.spark.sql.hive.CarbonRelation
 
 import org.apache.carbondata.common.Strings
-import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
 import org.apache.carbondata.core.metadata.datatype.DataTypes
 import org.apache.carbondata.core.metadata.schema.partition.PartitionType
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 
 private[sql] case class CarbonDescribeFormattedCommand(
     child: SparkPlan,
@@ -54,10 +54,22 @@ private[sql] case class CarbonDescribeFormattedCommand(
 
     val carbonTable = relation.carbonTable
     val tblProps = carbonTable.getTableInfo.getFactTable.getTableProperties.asScala
+    // If Sort Columns are given and Sort Scope is not given in either table properties
+    // or carbon properties then pass LOCAL_SORT as the sort scope,
+    // else pass NO_SORT
     val sortScope = if (carbonTable.getNumberOfSortColumns == 0) {
       "NO_SORT"
     } else {
-      tblProps.getOrElse("sort_scope", CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)
+      if (tblProps.contains(CarbonCommonConstants.SORT_SCOPE)) {
+        tblProps.get(CarbonCommonConstants.SORT_SCOPE).toString
+      } else {
+        tblProps
+          .getOrElse(CarbonCommonConstants.SORT_SCOPE,
+            CarbonProperties.getInstance()
+              .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
+                CarbonProperties.getInstance().getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
+                  "LOCAL_SORT")))
+      }
     }
     val streaming: String = if (carbonTable.isStreamingSink) {
       "sink"


[carbondata] 20/27: [HOTFIX] Fix select query on varchar column with large data fails with jvm crash

Posted by ra...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/carbondata.git

commit df92df1df343f7084084c3d5d5c42f48ed6c14f1
Author: shardul-cr7 <sh...@gmail.com>
AuthorDate: Fri Jan 25 16:43:45 2019 +0530

    [HOTFIX] Fix select query on varchar column with large data fails with jvm crash
    
    Problem : When select query fired on varchar column having large data it results in JVM crash because when when we increase the ReusableBuffer by 30% the new size gets gets reduced because requestSize * 30 gets out of range for int which gives a negative value and the total size gets reduced.
    
    Solution : While assigning the size to ByteBuffer we first check if total size less than the requested size then we pass requested size to to the ByteBuffer.
    
    This closes #3104
---
 .../apache/carbondata/core/constants/CarbonCommonConstants.java  | 7 +++++++
 .../org/apache/carbondata/core/datastore/ReusableDataBuffer.java | 9 ++++++++-
 2 files changed, 15 insertions(+), 1 deletion(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 86bf5f1..f5c07a4 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1387,6 +1387,13 @@ public final class CarbonCommonConstants {
   public static final int CARBON_DYNAMIC_ALLOCATION_SCHEDULER_THREAD_SLEEP_TIME = 250;
 
   /**
+   * We increment the requested page size by 30% only if the requested size is less than 10MB.
+   * Otherwise we take the original requested page size.This parameter will be used till the
+   * size based page implementation comes in carbon.
+   */
+  public static final int REQUESTED_PAGE_SIZE_MAX = 10485760;
+
+  /**
    * It allows queries on hive metastore directly along with filter information, otherwise first
    * fetches all partitions from hive and apply filters on it.
    */
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/ReusableDataBuffer.java b/core/src/main/java/org/apache/carbondata/core/datastore/ReusableDataBuffer.java
index e0c234a..d0add0c 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/ReusableDataBuffer.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/ReusableDataBuffer.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.core.datastore;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 
 /**
  * class holds the reusable data buffer based on request it will resize.
@@ -47,7 +48,13 @@ public class ReusableDataBuffer {
    */
   public byte[] getDataBuffer(int requestedSize) {
     if (dataBuffer == null || requestedSize > size) {
-      this.size = requestedSize + ((requestedSize * 30) / 100);
+      // increase by 30% only if the requestedSize less than 10 MB
+      // otherwise take the original requestedSize.
+      if (requestedSize < CarbonCommonConstants.REQUESTED_PAGE_SIZE_MAX) {
+        this.size = requestedSize + ((requestedSize * 30) / 100);
+      } else {
+        this.size = requestedSize;
+      }
       dataBuffer = new byte[size];
     }
     return dataBuffer;


[carbondata] 12/27: [HOTFIX] presto carbon doesn't work with Hadoop conf in cluster.

Posted by ra...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/carbondata.git

commit 1aadaabb39b1d31a0019a3fb2e882538663bc4af
Author: ajantha-bhat <aj...@gmail.com>
AuthorDate: Mon Jan 21 17:47:22 2019 +0530

    [HOTFIX] presto carbon doesn't work with Hadoop conf in cluster.
    
    problem : presto carbon doesn't work with Hadoop conf in cluster.
    
    cause:
    When presto queries are run in cluster, it fails with below message.
    IllegalArgumentException java.net.UnknownHostException: hacluster
    configuration from hdfsEnvironment is not used while checking schema path. hence the file factory is throwing exception.
    
    solution: set the configuration while checking schema path and other places in presto
    
    This closes #3089
---
 .../carbondata/core/util/path/CarbonTablePath.java |  23 ++++-
 .../carbondata/presto/impl/CarbonTableReader.java  | 105 +++++++++++----------
 2 files changed, 76 insertions(+), 52 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
index 8538e37..da558be 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
@@ -25,6 +25,8 @@ import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.locks.LockUsage;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
 
+import org.apache.hadoop.conf.Configuration;
+
 /**
  * Helps to get Table content paths.
  */
@@ -175,12 +177,27 @@ public class CarbonTablePath {
    * @return schema file path
    */
   public static String getSchemaFilePath(String tablePath) {
-    return getActualSchemaFilePath(tablePath);
+    return getActualSchemaFilePath(tablePath, null);
+  }
+
+  /**
+   * return the schema file path
+   * @param tablePath path to table files
+   * @param hadoopConf hadoop configuration instance
+   * @return schema file path
+   */
+  public static String getSchemaFilePath(String tablePath, Configuration hadoopConf) {
+    return getActualSchemaFilePath(tablePath, hadoopConf);
   }
 
-  private static String getActualSchemaFilePath(String tablePath) {
+  private static String getActualSchemaFilePath(String tablePath, Configuration hadoopConf) {
     String metaPath = tablePath + CarbonCommonConstants.FILE_SEPARATOR + METADATA_DIR;
-    CarbonFile carbonFile = FileFactory.getCarbonFile(metaPath);
+    CarbonFile carbonFile;
+    if (hadoopConf != null) {
+      carbonFile = FileFactory.getCarbonFile(metaPath, hadoopConf);
+    } else {
+      carbonFile = FileFactory.getCarbonFile(metaPath);
+    }
     CarbonFile[] schemaFile = carbonFile.listFiles(new CarbonFileFilter() {
       @Override public boolean accept(CarbonFile file) {
         return file.getName().startsWith(SCHEMA_FILE);
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
index 1121a37..916e44c 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
@@ -168,60 +168,59 @@ public class CarbonTableReader {
   private CarbonTableCacheModel parseCarbonMetadata(SchemaTableName table, String tablePath,
       Configuration config) {
     try {
-      CarbonTableCacheModel cache = carbonCache.get().get(table);
-      if (cache != null && cache.isValid()) {
+      CarbonTableCacheModel cache = getValidCacheBySchemaTableName(table);
+      if (cache != null) {
         return cache;
       }
-      // Step 1: get store path of the table and cache it.
-      String schemaFilePath = CarbonTablePath.getSchemaFilePath(tablePath);
-      // If metadata folder exists, it is a transactional table
-      CarbonFile schemaFile = FileFactory.getCarbonFile(schemaFilePath, config);
-      boolean isTransactionalTable = schemaFile.exists();
-      org.apache.carbondata.format.TableInfo tableInfo;
-      long modifiedTime = System.currentTimeMillis();
-      if (isTransactionalTable) {
-        //Step 2: read the metadata (tableInfo) of the table.
-        ThriftReader.TBaseCreator createTBase = new ThriftReader.TBaseCreator() {
-          // TBase is used to read and write thrift objects.
-          // TableInfo is a kind of TBase used to read and write table information.
-          // TableInfo is generated by thrift,
-          // see schema.thrift under format/src/main/thrift for details.
-          public TBase create() {
-            return new org.apache.carbondata.format.TableInfo();
-          }
-        };
-        ThriftReader thriftReader =
-            new ThriftReader(schemaFilePath, createTBase, config);
-        thriftReader.open();
-        tableInfo = (org.apache.carbondata.format.TableInfo) thriftReader.read();
-        thriftReader.close();
-        modifiedTime = schemaFile.getLastModifiedTime();
-      } else {
-        tableInfo = CarbonUtil
-            .inferSchema(tablePath, table.getTableName(), false, config);
-      }
-      // Step 3: convert format level TableInfo to code level TableInfo
-      SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
-      // wrapperTableInfo is the code level information of a table in carbondata core,
-      // different from the Thrift TableInfo.
-      TableInfo wrapperTableInfo = schemaConverter
-          .fromExternalToWrapperTableInfo(tableInfo, table.getSchemaName(), table.getTableName(),
-              tablePath);
-
-      wrapperTableInfo.setTransactionalTable(isTransactionalTable);
-
-      CarbonMetadata.getInstance().removeTable(wrapperTableInfo.getTableUniqueName());
-      // Step 4: Load metadata info into CarbonMetadata
-      CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo);
-      CarbonTable carbonTable = Objects.requireNonNull(
-          CarbonMetadata.getInstance().getCarbonTable(table.getSchemaName(), table.getTableName()),
-          "carbontable is null");
-      // If table is not previously cached, then:
-      if (cache == null) {
+      // multiple tasks can be launched in a worker concurrently. Hence need to synchronize this.
+      synchronized (this) {
+        // cache might be filled by another thread, so if filled use that cache.
+        CarbonTableCacheModel cacheModel = getValidCacheBySchemaTableName(table);
+        if (cacheModel != null) {
+          return cacheModel;
+        }
+        // Step 1: get store path of the table and cache it.
+        String schemaFilePath = CarbonTablePath.getSchemaFilePath(tablePath, config);
+        // If metadata folder exists, it is a transactional table
+        CarbonFile schemaFile = FileFactory.getCarbonFile(schemaFilePath, config);
+        boolean isTransactionalTable = schemaFile.exists();
+        org.apache.carbondata.format.TableInfo tableInfo;
+        long modifiedTime = System.currentTimeMillis();
+        if (isTransactionalTable) {
+          //Step 2: read the metadata (tableInfo) of the table.
+          ThriftReader.TBaseCreator createTBase = new ThriftReader.TBaseCreator() {
+            // TBase is used to read and write thrift objects.
+            // TableInfo is a kind of TBase used to read and write table information.
+            // TableInfo is generated by thrift,
+            // see schema.thrift under format/src/main/thrift for details.
+            public TBase create() {
+              return new org.apache.carbondata.format.TableInfo();
+            }
+          };
+          ThriftReader thriftReader = new ThriftReader(schemaFilePath, createTBase, config);
+          thriftReader.open();
+          tableInfo = (org.apache.carbondata.format.TableInfo) thriftReader.read();
+          thriftReader.close();
+          modifiedTime = schemaFile.getLastModifiedTime();
+        } else {
+          tableInfo = CarbonUtil.inferSchema(tablePath, table.getTableName(), false, config);
+        }
+        // Step 3: convert format level TableInfo to code level TableInfo
+        SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
+        // wrapperTableInfo is the code level information of a table in carbondata core,
+        // different from the Thrift TableInfo.
+        TableInfo wrapperTableInfo = schemaConverter
+            .fromExternalToWrapperTableInfo(tableInfo, table.getSchemaName(), table.getTableName(),
+                tablePath);
+        wrapperTableInfo.setTransactionalTable(isTransactionalTable);
+        CarbonMetadata.getInstance().removeTable(wrapperTableInfo.getTableUniqueName());
+        // Step 4: Load metadata info into CarbonMetadata
+        CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo);
+        CarbonTable carbonTable = Objects.requireNonNull(CarbonMetadata.getInstance()
+            .getCarbonTable(table.getSchemaName(), table.getTableName()), "carbontable is null");
         cache = new CarbonTableCacheModel(modifiedTime, carbonTable);
         // cache the table
         carbonCache.get().put(table, cache);
-      } else {
         cache.setCarbonTable(carbonTable);
       }
       return cache;
@@ -230,6 +229,14 @@ public class CarbonTableReader {
     }
   }
 
+  private CarbonTableCacheModel getValidCacheBySchemaTableName(SchemaTableName schemaTableName) {
+    CarbonTableCacheModel cache = carbonCache.get().get(schemaTableName);
+    if (cache != null && cache.isValid()) {
+      return cache;
+    }
+    return null;
+  }
+
   public List<CarbonLocalMultiBlockSplit> getInputSplits2(CarbonTableCacheModel tableCacheModel,
       Expression filters, TupleDomain<HiveColumnHandle> constraints, Configuration config)
       throws IOException {


[carbondata] 04/27: [DOC] Update range_info partition example in ddl description

Posted by ra...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/carbondata.git

commit 7b66f4962129b480504fd6210d25061c123e8a0c
Author: qiuchenjian <80...@qq.com>
AuthorDate: Tue Jan 22 10:36:30 2019 +0800

    [DOC] Update range_info partition example in ddl description
    
    This closes #3092
---
 docs/ddl-of-carbondata.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/docs/ddl-of-carbondata.md b/docs/ddl-of-carbondata.md
index b9b391b..a1b0ce7 100644
--- a/docs/ddl-of-carbondata.md
+++ b/docs/ddl-of-carbondata.md
@@ -950,7 +950,7 @@ Users can specify which columns to include and exclude for local dictionary gene
       col_D DECIMAL(10,2),
       col_E LONG
   ) partitioned by (col_F Timestamp)
-  PARTITIONED BY 'carbondata'
+  STORED BY 'carbondata'
   TBLPROPERTIES('PARTITION_TYPE'='RANGE',
   'RANGE_INFO'='2015-01-01, 2016-01-01, 2017-01-01, 2017-02-01')
   ```


[carbondata] 26/27: [CARBONDATA-3280] Fix the issue of SDK assert can't work

Posted by ra...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/carbondata.git

commit 2500a902596f1cc0975c5c486293c9aa2b84b73a
Author: xubo245 <xu...@huawei.com>
AuthorDate: Tue Jan 29 11:36:48 2019 +0800

    [CARBONDATA-3280] Fix the issue of SDK assert can't work
    
    After PR-3097 merged, the batch rule has been changed, but the test didn't work, such as:
    
    org.apache.carbondata.sdk.file.CarbonReaderTest#testReadNextBatchRow
    org.apache.carbondata.sdk.file.CarbonReaderTest#testReadNextBatchRowWithVectorReader
    So this PR fixed the test error and add some assert
    
    This closes #3112
---
 .../carbondata/core/util/CarbonProperties.java     |  2 +-
 .../carbondata/sdk/file/CarbonReaderTest.java      | 90 +++++++++++++++-------
 2 files changed, 63 insertions(+), 29 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index 49388b7..b337e40 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -1572,7 +1572,7 @@ public final class CarbonProperties {
       try {
         batchSize = Integer.parseInt(batchSizeString);
         if (batchSize < DETAIL_QUERY_BATCH_SIZE_MIN || batchSize > DETAIL_QUERY_BATCH_SIZE_MAX) {
-          LOGGER.info("Invalid carbon.detail.batch.size.Using default value "
+          LOGGER.warn("Invalid carbon.detail.batch.size.Using default value "
               + DETAIL_QUERY_BATCH_SIZE_DEFAULT);
           carbonProperties.setProperty(DETAIL_QUERY_BATCH_SIZE,
               Integer.toString(DETAIL_QUERY_BATCH_SIZE_DEFAULT));
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
index 28944da..871d51b 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
@@ -104,7 +104,7 @@ public class CarbonReaderTest extends TestCase {
     FileUtils.deleteDirectory(new File(path));
   }
 
-  @Test public void testReadWithZeroBatchSize() throws IOException, InterruptedException {
+  @Test public void testReadWithZeroBatchSize() throws Exception {
     String path = "./testWriteFiles";
     FileUtils.deleteDirectory(new File(path));
     DataMapStoreManager.getInstance().clearDataMaps(AbsoluteTableIdentifier.from(path));
@@ -127,6 +127,30 @@ public class CarbonReaderTest extends TestCase {
     FileUtils.deleteDirectory(new File(path));
   }
 
+
+  @Test
+  public void testReadBatchWithZeroBatchSize() throws Exception {
+    String path = "./testWriteFiles";
+    FileUtils.deleteDirectory(new File(path));
+    DataMapStoreManager.getInstance().clearDataMaps(AbsoluteTableIdentifier.from(path));
+    Field[] fields = new Field[2];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("age", DataTypes.INT);
+
+    TestUtil.writeFilesAndVerify(10, new Schema(fields), path);
+    CarbonReader reader;
+    reader = CarbonReader.builder(path).withRowRecordReader().withBatch(0).build();
+
+    int i = 0;
+    while (reader.hasNext()) {
+      Object[] row = reader.readNextBatchRow();
+      Assert.assertEquals(row.length, 10);
+      i++;
+    }
+    Assert.assertEquals(i, 1);
+    FileUtils.deleteDirectory(new File(path));
+  }
+
   @Test
   public void testReadWithFilterOfNonTransactionalSimple() throws IOException, InterruptedException {
     String path = "./testWriteFiles";
@@ -532,6 +556,7 @@ public class CarbonReaderTest extends TestCase {
   .withCsvInput(schema).writtenBy("CarbonReaderTest").build();
     } catch (InvalidLoadOptionException e) {
       e.printStackTrace();
+      Assert.fail(e.getMessage());
     }
     carbonWriter.write(new String[] { "MNO", "100" });
     carbonWriter.close();
@@ -546,22 +571,25 @@ public class CarbonReaderTest extends TestCase {
    .withCsvInput(schema1).writtenBy("CarbonReaderTest").build();
     } catch (InvalidLoadOptionException e) {
       e.printStackTrace();
+      Assert.fail(e.getMessage());
     }
     carbonWriter1.write(new String[] { "PQR", "200" });
     carbonWriter1.close();
 
     try {
-       CarbonReader reader =
-       CarbonReader.builder(path1, "_temp").
-       projection(new String[] { "c1", "c3" })
-       .build();
-    } catch (Exception e){
-       System.out.println("Success");
+      CarbonReader reader =
+          CarbonReader.builder(path1, "_temp")
+              .projection(new String[]{"c1", "c3"})
+              .build();
+      Assert.fail();
+    } catch (Exception e) {
+      System.out.println("Success");
+      Assert.assertTrue(true);
     }
     CarbonReader reader1 =
-         CarbonReader.builder(path2, "_temp1")
-     .projection(new String[] { "p1", "p2" })
-     .build();
+        CarbonReader.builder(path2, "_temp1")
+            .projection(new String[]{"p1", "p2"})
+            .build();
 
     while (reader1.hasNext()) {
        Object[] row1 = (Object[]) reader1.readNextRow();
@@ -1292,6 +1320,7 @@ public class CarbonReaderTest extends TestCase {
       WriteAvroComplexData(mySchema, json, path);
     } catch (InvalidLoadOptionException e) {
       e.printStackTrace();
+      Assert.fail(e.getMessage());
     }
 
     File folder = new File(path);
@@ -1357,6 +1386,7 @@ public class CarbonReaderTest extends TestCase {
       WriteAvroComplexData(mySchema, json, path);
     } catch (InvalidLoadOptionException e) {
       e.printStackTrace();
+      Assert.fail(e.getMessage());
     }
 
     Field[] fields = new Field[3];
@@ -1509,6 +1539,7 @@ public class CarbonReaderTest extends TestCase {
       FileUtils.deleteDirectory(new File(path));
     } catch (Throwable e) {
       e.printStackTrace();
+      Assert.fail(e.getMessage());
     }
   }
 
@@ -1833,9 +1864,9 @@ public class CarbonReaderTest extends TestCase {
           .writtenBy("CarbonReaderTest")
           .build();
 
-      for (int i = 0; i < 10; i++) {
+      for (int i = 0; i < 300; i++) {
         String[] row2 = new String[]{
-            "robot" + (i % 10),
+            "robot" + (i % 10000),
             String.valueOf(i % 10000),
             String.valueOf(i),
             String.valueOf(Long.MAX_VALUE - i),
@@ -1852,11 +1883,11 @@ public class CarbonReaderTest extends TestCase {
       }
       writer.close();
 
-      // Read data
-      int batchSize =4;
+        // Read data
+      int batchSize = 150;
       CarbonReader reader = CarbonReader
           .builder(path, "_temp")
-          .withBatch(4)
+          .withBatch(batchSize)
           .build();
 
       int i = 0;
@@ -1889,6 +1920,7 @@ public class CarbonReaderTest extends TestCase {
       reader.close();
     } catch (Throwable e) {
       e.printStackTrace();
+      Assert.fail(e.getMessage());
     } finally {
       try {
         FileUtils.deleteDirectory(new File(path));
@@ -1897,6 +1929,7 @@ public class CarbonReaderTest extends TestCase {
       }
     }
   }
+
   @Test
   public void testReadNextBatchRowWithVectorReader() {
     String path = "./carbondata";
@@ -1926,9 +1959,9 @@ public class CarbonReaderTest extends TestCase {
           .writtenBy("CarbonReaderTest")
           .build();
 
-      for (int i = 0; i < 10; i++) {
+      for (int i = 0; i < 300; i++) {
         String[] row2 = new String[]{
-            "robot" + (i % 10),
+            "robot" + (i % 10000),
             String.valueOf(i % 10000),
             String.valueOf(i),
             String.valueOf(Long.MAX_VALUE - i),
@@ -1945,10 +1978,10 @@ public class CarbonReaderTest extends TestCase {
       writer.close();
 
       // Read data
-      int batchSize =4;
+      int batchSize = 150;
       CarbonReader reader = CarbonReader
           .builder(path, "_temp")
-          .withBatch(4)
+          .withBatch(batchSize)
           .build();
 
       int i = 0;
@@ -1975,6 +2008,7 @@ public class CarbonReaderTest extends TestCase {
       reader.close();
     } catch (Throwable e) {
       e.printStackTrace();
+      Assert.fail(e.getMessage());
     } finally {
       try {
         FileUtils.deleteDirectory(new File(path));
@@ -2114,9 +2148,9 @@ public class CarbonReaderTest extends TestCase {
           .build();
     } catch (IllegalArgumentException e) {
       e.printStackTrace();
-      Assert.fail();
+      Assert.fail(e.getMessage());
     } catch (Exception e) {
-      Assert.fail();
+      Assert.fail(e.getMessage());
     } finally {
       FileUtils.deleteDirectory(new File(path));
     }
@@ -2144,7 +2178,7 @@ public class CarbonReaderTest extends TestCase {
       Assert.assertTrue(e.getMessage().contains(
           "Invalid value FLSE for key bad_records_logger_enable"));
     } catch (Exception e) {
-      Assert.fail();
+      Assert.fail(e.getMessage());
     } finally {
       FileUtils.deleteDirectory(new File(path));
     }
@@ -2171,7 +2205,7 @@ public class CarbonReaderTest extends TestCase {
       e.printStackTrace();
       Assert.fail();
     } catch (Exception e) {
-      Assert.fail();
+      Assert.fail(e.getMessage());
     } finally {
       FileUtils.deleteDirectory(new File(path));
     }
@@ -2199,7 +2233,7 @@ public class CarbonReaderTest extends TestCase {
       Assert.assertTrue(e.getMessage().contains(
           "QUOTECHAR cannot be more than one character."));
     } catch (Exception e) {
-      Assert.fail();
+      Assert.fail(e.getMessage());
     } finally {
       FileUtils.deleteDirectory(new File(path));
     }
@@ -2226,7 +2260,7 @@ public class CarbonReaderTest extends TestCase {
       e.printStackTrace();
       Assert.fail();
     } catch (Exception e) {
-      Assert.fail();
+      Assert.fail(e.getMessage());
     } finally {
       FileUtils.deleteDirectory(new File(path));
     }
@@ -2254,7 +2288,7 @@ public class CarbonReaderTest extends TestCase {
       Assert.assertTrue(e.getMessage().contains(
           "ESCAPECHAR cannot be more than one character."));
     } catch (Exception e) {
-      Assert.fail();
+      Assert.fail(e.getMessage());
     } finally {
       FileUtils.deleteDirectory(new File(path));
     }
@@ -2279,9 +2313,9 @@ public class CarbonReaderTest extends TestCase {
           .build();
     } catch (IllegalArgumentException e) {
       e.printStackTrace();
-      Assert.fail();
+      Assert.fail(e.getMessage());
     } catch (Exception e) {
-      Assert.fail();
+      Assert.fail(e.getMessage());
     } finally {
       FileUtils.deleteDirectory(new File(path));
     }


[carbondata] 23/27: [HOTFIX] Upgraded jars to work S3 with presto

Posted by ra...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/carbondata.git

commit 5b4cd18432ee89a7705e593da6d001e6c7b28d6e
Author: ravipesala <ra...@gmail.com>
AuthorDate: Sun Jan 27 15:12:29 2019 +0530

    [HOTFIX] Upgraded jars to work S3 with presto
    
    There is a duplicate jar aws-java-sdk and low version jars avoid connecting to S3 in presto. Those jars are upgraded in this PR and updated doc.
    
    This closes #3110
---
 .../statusmanager/SegmentUpdateStatusManager.java  |  3 ++-
 docs/presto-guide.md                               | 18 ++++-----------
 integration/presto/pom.xml                         | 27 ++++++++--------------
 3 files changed, 16 insertions(+), 32 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
index c5f5f74..a02e903 100644
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
@@ -52,6 +52,7 @@ import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 
 import com.google.gson.Gson;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.log4j.Logger;
 
 /**
@@ -655,7 +656,7 @@ public class SegmentUpdateStatusManager {
     // get the updated status file identifier from the table status.
     String tableUpdateStatusIdentifier = getUpdatedStatusIdentifier();
 
-    if (null == tableUpdateStatusIdentifier) {
+    if (StringUtils.isEmpty(tableUpdateStatusIdentifier)) {
       return new SegmentUpdateDetails[0];
     }
 
diff --git a/docs/presto-guide.md b/docs/presto-guide.md
index 054f29f..7389bc6 100644
--- a/docs/presto-guide.md
+++ b/docs/presto-guide.md
@@ -254,23 +254,15 @@ Now you can use the Presto CLI on the coordinator to query data sources in the c
    ```
     Required properties
 
-    fs.s3a.access.key={value}
-    fs.s3a.secret.key={value}
+    hive.s3.aws-access-key={value}
+    hive.s3.aws-secret-key={value}
     
     Optional properties
     
-    fs.s3a.endpoint={value}
+    hive.s3.endpoint={value}
    ```
- * In case you want to query carbonstore on s3 using S3 api put following additional properties inside $PRESTO_HOME$/etc/catalog/carbondata.properties 
-    ```
-      fs.s3.awsAccessKeyId={value}
-      fs.s3.awsSecretAccessKey={value}
-    ```
-  * In case You want to query carbonstore on s3 using S3N api put following additional properties inside $PRESTO_HOME$/etc/catalog/carbondata.properties 
-    ```
-        fs.s3n.awsAccessKeyId={value}
-        fs.s3n.awsSecretAccessKey={value}
-     ```
+   
+   Please refer <a>https://prestodb.io/docs/current/connector/hive.html</a> for more details on S3 integration.
     
 ### Generate CarbonData file
 
diff --git a/integration/presto/pom.xml b/integration/presto/pom.xml
index fd955dd..0484781 100644
--- a/integration/presto/pom.xml
+++ b/integration/presto/pom.xml
@@ -32,6 +32,7 @@
 
   <properties>
     <presto.version>0.210</presto.version>
+    <httpcore.version>4.4.9</httpcore.version>
     <dev.path>${basedir}/../../dev</dev.path>
     <jacoco.append>true</jacoco.append>
   </properties>
@@ -376,7 +377,7 @@
     <dependency>
       <groupId>com.facebook.presto.hadoop</groupId>
       <artifactId>hadoop-apache2</artifactId>
-      <version>2.7.3-1</version>
+      <version>2.7.4-3</version>
       <exclusions>
         <exclusion>
           <groupId>org.antlr</groupId>
@@ -522,23 +523,8 @@
           <artifactId>jackson-core</artifactId>
         </exclusion>
         <exclusion>
-          <groupId>com.fasterxml.jackson.core</groupId>
-          <artifactId>jackson-annotations</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>com.fasterxml.jackson.core</groupId>
-          <artifactId>jackson-databind</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-    <dependency>
-      <groupId>com.amazonaws</groupId>
-      <artifactId>aws-java-sdk</artifactId>
-      <version>1.7.4</version>
-      <exclusions>
-        <exclusion>
-          <groupId>com.fasterxml.jackson.core</groupId>
-          <artifactId>jackson-core</artifactId>
+          <groupId>com.amazonaws</groupId>
+          <artifactId>aws-java-sdk</artifactId>
         </exclusion>
         <exclusion>
           <groupId>com.fasterxml.jackson.core</groupId>
@@ -560,6 +546,11 @@
       <artifactId>httpcore</artifactId>
       <version>${httpcore.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.httpcomponents</groupId>
+      <artifactId>httpclient</artifactId>
+      <version>4.5.5</version>
+    </dependency>
   </dependencies>
 
   <build>


[carbondata] 25/27: [HOTFIX] SDV framework for presto cluster test suite

Posted by ra...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/carbondata.git

commit 64ae35a02c9ce1d59be29353d35367ca6e2d20f6
Author: ajantha-bhat <aj...@gmail.com>
AuthorDate: Mon Jan 28 10:14:53 2019 +0530

    [HOTFIX] SDV framework for presto cluster test suite
    
    [HOTFIX] SDV framework for presto cluster test suite
    a) Added a suite for presto cluster test with a sample test case where carbon presto reads the store created by spark.
    b) When single suite selected for running. other module test cases were running like SDK, CLI, processing. Fixed this problem adding sdvtest profile modules that has issues
    
    This closes #3111
---
 common/pom.xml                                     |  8 ++
 examples/flink/pom.xml                             |  8 ++
 format/pom.xml                                     |  8 ++
 .../PrestoAllDataTypeLocalDictTest.scala           |  2 +-
 .../integrationtest/PrestoAllDataTypeTest.scala    |  2 +-
 .../PrestoTestNonTransactionalTableFiles.scala     |  2 +-
 .../carbondata/presto/server/PrestoServer.scala    | 26 +++---
 integration/spark-common-cluster-test/pom.xml      | 17 ++++
 .../sdv/generated/PrestoSampleTestCase.scala       | 56 +++++++++++++
 .../carbondata/cluster/sdv/suite/SDVSuites.scala   | 16 ++++
 .../apache/spark/sql/common/util/QueryTest.scala   | 97 ++++++++++++++++++++--
 .../spark/sql/test/Spark2TestQueryExecutor.scala   |  5 ++
 pom.xml                                            |  2 +
 processing/pom.xml                                 |  8 ++
 store/sdk/pom.xml                                  |  8 ++
 streaming/pom.xml                                  |  8 ++
 tools/cli/pom.xml                                  |  8 ++
 17 files changed, 255 insertions(+), 26 deletions(-)

diff --git a/common/pom.xml b/common/pom.xml
index e10b48b..e386d2b 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -68,5 +68,13 @@
       </plugin>
     </plugins>
   </build>
+  <profiles>
+    <profile>
+      <id>sdvtest</id>
+      <properties>
+        <maven.test.skip>true</maven.test.skip>
+      </properties>
+    </profile>
+  </profiles>
 
 </project>
diff --git a/examples/flink/pom.xml b/examples/flink/pom.xml
index 8d44e82..aae7b5e 100644
--- a/examples/flink/pom.xml
+++ b/examples/flink/pom.xml
@@ -74,5 +74,13 @@
       </plugin>
     </plugins>
   </build>
+  <profiles>
+    <profile>
+      <id>sdvtest</id>
+      <properties>
+        <maven.test.skip>true</maven.test.skip>
+      </properties>
+    </profile>
+  </profiles>
 
 </project>
\ No newline at end of file
diff --git a/format/pom.xml b/format/pom.xml
index e923bfb..ecc9cc7 100644
--- a/format/pom.xml
+++ b/format/pom.xml
@@ -68,5 +68,13 @@
       </plugin>
     </plugins>
   </build>
+  <profiles>
+    <profile>
+      <id>sdvtest</id>
+      <properties>
+        <maven.test.skip>true</maven.test.skip>
+      </properties>
+    </profile>
+  </profiles>
 
 </project>
\ No newline at end of file
diff --git a/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoAllDataTypeLocalDictTest.scala b/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoAllDataTypeLocalDictTest.scala
index 4360977..2735969 100644
--- a/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoAllDataTypeLocalDictTest.scala
+++ b/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoAllDataTypeLocalDictTest.scala
@@ -79,7 +79,7 @@ class PrestoAllDataTypeLocalDictTest extends FunSuiteLike with BeforeAndAfterAll
     map.put("hive.metastore", "file")
     map.put("hive.metastore.catalog.dir", s"file://$storePath")
 
-    prestoServer.startServer(storePath, "testdb", map)
+    prestoServer.startServer("testdb", map)
     prestoServer.execute("drop table if exists testdb.testtable")
     prestoServer.execute("drop schema if exists testdb")
     prestoServer.execute("create schema testdb")
diff --git a/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoAllDataTypeTest.scala b/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoAllDataTypeTest.scala
index 17490e4..205469c 100644
--- a/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoAllDataTypeTest.scala
+++ b/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoAllDataTypeTest.scala
@@ -80,7 +80,7 @@ class PrestoAllDataTypeTest extends FunSuiteLike with BeforeAndAfterAll {
     map.put("hive.metastore", "file")
     map.put("hive.metastore.catalog.dir", s"file://$storePath")
 
-    prestoServer.startServer(storePath, "testdb", map)
+    prestoServer.startServer("testdb", map)
     prestoServer.execute("drop table if exists testdb.testtable")
     prestoServer.execute("drop schema if exists testdb")
     prestoServer.execute("create schema testdb")
diff --git a/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoTestNonTransactionalTableFiles.scala b/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoTestNonTransactionalTableFiles.scala
index 6d17b8b..bdee4a1 100644
--- a/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoTestNonTransactionalTableFiles.scala
+++ b/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoTestNonTransactionalTableFiles.scala
@@ -57,7 +57,7 @@ class PrestoTestNonTransactionalTableFiles extends FunSuiteLike with BeforeAndAf
     map.put("hive.metastore", "file")
     map.put("hive.metastore.catalog.dir", s"file://$storePath")
 
-    prestoServer.startServer(storePath, "sdk_output", map)
+    prestoServer.startServer("sdk_output", map)
   }
 
   override def afterAll(): Unit = {
diff --git a/integration/presto/src/test/scala/org/apache/carbondata/presto/server/PrestoServer.scala b/integration/presto/src/test/scala/org/apache/carbondata/presto/server/PrestoServer.scala
index 0bde313..672e90f 100644
--- a/integration/presto/src/test/scala/org/apache/carbondata/presto/server/PrestoServer.scala
+++ b/integration/presto/src/test/scala/org/apache/carbondata/presto/server/PrestoServer.scala
@@ -25,6 +25,7 @@ import scala.util.{Failure, Success, Try}
 
 import com.facebook.presto.Session
 import com.facebook.presto.execution.QueryIdGenerator
+import com.facebook.presto.jdbc.PrestoStatement
 import com.facebook.presto.metadata.SessionPropertyManager
 import com.facebook.presto.spi.`type`.TimeZoneKey.UTC_KEY
 import com.facebook.presto.spi.security.Identity
@@ -47,18 +48,17 @@ class PrestoServer {
   createSession
   lazy val queryRunner = new DistributedQueryRunner(createSession, 4, prestoProperties)
   var dbName : String = null
+  var statement : PrestoStatement = _
 
 
   /**
    * start the presto server
    *
-   * @param carbonStorePath the store path of carbon
    */
-  def startServer(carbonStorePath: String): Unit = {
+  def startServer(): Unit = {
 
     LOGGER.info("======== STARTING PRESTO SERVER ========")
-    val queryRunner: DistributedQueryRunner = createQueryRunner(
-      prestoProperties, carbonStorePath)
+    val queryRunner: DistributedQueryRunner = createQueryRunner(prestoProperties)
 
     LOGGER.info("STARTED SERVER AT :" + queryRunner.getCoordinator.getBaseUrl)
   }
@@ -66,25 +66,23 @@ class PrestoServer {
   /**
    * start the presto server
    *
-   * @param carbonStorePath the store path of carbon
    * @param dbName the database name, if not a default database
    */
-  def startServer(carbonStorePath: String, dbName: String, properties: util.Map[String, String]= new util.HashMap[String, String]()): Unit = {
+  def startServer(dbName: String, properties: util.Map[String, String] = new util.HashMap[String, String]()): Unit = {
 
     this.dbName = dbName
     carbonProperties.putAll(properties)
     LOGGER.info("======== STARTING PRESTO SERVER ========")
-    val queryRunner: DistributedQueryRunner = createQueryRunner(
-      prestoProperties, carbonStorePath)
-
+    val queryRunner: DistributedQueryRunner = createQueryRunner(prestoProperties)
+    val conn: Connection = createJdbcConnection(dbName)
+    statement = conn.createStatement().asInstanceOf[PrestoStatement]
     LOGGER.info("STARTED SERVER AT :" + queryRunner.getCoordinator.getBaseUrl)
   }
 
   /**
    * Instantiates the Presto Server to connect with the Apache CarbonData
    */
-  private def createQueryRunner(extraProperties: util.Map[String, String],
-      carbonStorePath: String): DistributedQueryRunner = {
+  private def createQueryRunner(extraProperties: util.Map[String, String]) = {
     Try {
       queryRunner.installPlugin(new CarbondataPlugin)
       val carbonProperties = ImmutableMap.builder[String, String]
@@ -105,6 +103,7 @@ class PrestoServer {
    */
   def stopServer(): Unit = {
     queryRunner.close()
+    statement.close()
     LOGGER.info("***** Stopping The Server *****")
   }
 
@@ -117,9 +116,7 @@ class PrestoServer {
   def executeQuery(query: String): List[Map[String, Any]] = {
 
     Try {
-      val conn: Connection = createJdbcConnection(dbName)
       LOGGER.info(s"***** executing the query ***** \n $query")
-      val statement = conn.createStatement()
       val result: ResultSet = statement.executeQuery(query)
       convertResultSetToList(result)
     } match {
@@ -131,11 +128,8 @@ class PrestoServer {
   }
 
   def execute(query: String) = {
-
     Try {
-      val conn: Connection = createJdbcConnection(dbName)
       LOGGER.info(s"***** executing the query ***** \n $query")
-      val statement = conn.createStatement()
       statement.execute(query)
     } match {
       case Success(result) => result
diff --git a/integration/spark-common-cluster-test/pom.xml b/integration/spark-common-cluster-test/pom.xml
index 23acfd8..a6c85ad 100644
--- a/integration/spark-common-cluster-test/pom.xml
+++ b/integration/spark-common-cluster-test/pom.xml
@@ -80,6 +80,21 @@
       <version>${project.version}</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>com.facebook.presto</groupId>
+      <artifactId>presto-jdbc</artifactId>
+      <version>0.210</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.antlr</groupId>
+          <artifactId>antlr4-runtime</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>jackson-databind</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
   </dependencies>
 
   <build>
@@ -164,6 +179,8 @@
             <java.awt.headless>true</java.awt.headless>
             <spark.master.url>${spark.master.url}</spark.master.url>
             <hdfs.url>${hdfs.url}</hdfs.url>
+            <presto.jdbc.url>${presto.jdbc.url}</presto.jdbc.url>
+            <spark.hadoop.hive.metastore.uris>${spark.hadoop.hive.metastore.uris}</spark.hadoop.hive.metastore.uris>
             <spark.carbon.hive.schema.store>${carbon.hive.based.metastore}</spark.carbon.hive.schema.store>
           </systemProperties>
         </configuration>
diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/PrestoSampleTestCase.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/PrestoSampleTestCase.scala
new file mode 100644
index 0000000..336f8bc
--- /dev/null
+++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/PrestoSampleTestCase.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.cluster.sdv.generated
+
+import org.apache.spark.sql.common.util._
+import org.scalatest.BeforeAndAfterAll
+
+class PrestoSampleTestCase extends QueryTest with BeforeAndAfterAll {
+
+  override def beforeAll {
+    sql("DROP TABLE IF EXISTS sample_table")
+    if (System.getProperty("spark.master.url") != null) {
+    QueryTest.PrestoQueryTest.initJdbcConnection("default")
+    }
+  }
+
+  test("test read spark store from presto ") {
+    sql("show tables").show(false)
+
+    sql("DROP TABLE IF EXISTS sample_table")
+    sql("CREATE TABLE sample_table (name string) STORED BY 'carbondata'")
+    sql("insert into sample_table select 'ajantha'")
+    sql("select * from sample_table ").show(200, false)
+    sql("describe formatted sample_table ").show(200, false)
+    if (System.getProperty("spark.master.url") != null) {
+      // supports only running through cluster
+      val actualResult: List[Map[String, Any]] = QueryTest.PrestoQueryTest
+              .executeQuery("select * from sample_table")
+     println("ans---------" + actualResult(0).toString())
+      val expectedResult: List[Map[String, Any]] = List(Map(
+        "name" -> "ajantha"))
+      assert(actualResult.toString() equals expectedResult.toString())
+    }
+  }
+
+  override def afterAll {
+    sql("DROP TABLE IF EXISTS sample_table")
+    QueryTest.PrestoQueryTest.closeJdbcConnection()
+  }
+
+}
\ No newline at end of file
diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/suite/SDVSuites.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/suite/SDVSuites.scala
index f2ae2cb..5367e0d 100644
--- a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/suite/SDVSuites.scala
+++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/suite/SDVSuites.scala
@@ -187,3 +187,19 @@ class SDVSuites4 extends Suites with BeforeAndAfterAll {
     println("---------------- Stopped spark -----------------")
   }
 }
+
+/**
+ * Suite class for presto tests
+ */
+class SDVSuites5 extends Suites with BeforeAndAfterAll {
+
+  val suites =  new PrestoSampleTestCase :: Nil
+
+  override val nestedSuites = suites.toIndexedSeq
+
+  override protected def afterAll() = {
+    println("---------------- Stopping spark -----------------")
+    TestQueryExecutor.INSTANCE.stop()
+    println("---------------- Stopped spark -----------------")
+  }
+}
\ No newline at end of file
diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
index 39beae1..9d4fe79 100644
--- a/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
+++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
@@ -18,23 +18,25 @@
 package org.apache.spark.sql.common.util
 
 import java.io.{ObjectInputStream, ObjectOutputStream}
-import java.util.{Locale, TimeZone}
+import java.sql.{Connection, DriverManager, ResultSet}
+import java.util.{Locale, Properties}
 
-import org.apache.carbondata.common.logging.LogServiceFactory
 import scala.collection.JavaConversions._
+import scala.util.{Failure, Success, Try}
 
+import com.facebook.presto.jdbc.{PrestoConnection, PrestoStatement}
 import org.apache.spark.sql.carbondata.execution.datasources.CarbonFileIndexReplaceRule
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.execution.command.LoadDataCommand
 import org.apache.spark.sql.hive.CarbonSessionCatalog
 import org.apache.spark.sql.test.{ResourceRegisterAndCopier, TestQueryExecutor}
-import org.apache.spark.sql.{CarbonSession, DataFrame, Row, SQLContext}
+import org.apache.spark.sql.{DataFrame, Row, SQLContext}
 import org.scalatest.Suite
 
-import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.commons.lang.StringUtils
 
 class QueryTest extends PlanTest with Suite {
 
@@ -154,8 +156,6 @@ object QueryTest {
     }
   }
 
-  import java.text.DecimalFormat
-
   /**
    * Runs the plan and makes sure the answer matches the expected result.
    * If there was exception during the execution or the contents of the DataFrame does not
@@ -220,4 +220,87 @@ object QueryTest {
 
     return None
   }
+
+  object PrestoQueryTest {
+
+    var statement : PrestoStatement = _
+
+    def initJdbcConnection(dbName: String): Unit = {
+      val conn: Connection = if (System.getProperty("presto.jdbc.url") != null) {
+        createJdbcConnection(dbName, System.getProperty("presto.jdbc.url"))
+      } else {
+        createJdbcConnection(dbName, "localhost:8086")
+      }
+      statement = conn.createStatement().asInstanceOf[PrestoStatement]
+    }
+
+    def closeJdbcConnection(): Unit = {
+      statement.close()
+    }
+
+    /**
+     * execute the query by establishing the jdbc connection
+     *
+     * @param query
+     * @return
+     */
+    def executeQuery(query: String): List[Map[String, Any]] = {
+      Try {
+        val result: ResultSet = statement.executeQuery(query)
+        convertResultSetToList(result)
+      } match {
+        case Success(result) => result
+        case Failure(jdbcException) =>
+          throw jdbcException
+      }
+    }
+
+    /**
+     * Creates a JDBC Client to connect CarbonData to Presto
+     *
+     * @return
+     */
+    private def createJdbcConnection(dbName: String, url: String) = {
+      val JDBC_DRIVER = "com.facebook.presto.jdbc.PrestoDriver"
+      var DB_URL : String = null
+      if (StringUtils.isEmpty(dbName)) {
+        DB_URL = "jdbc:presto://"+ url + "/carbondata/default"
+      } else {
+        DB_URL = "jdbc:presto://" + url + "/carbondata/" + dbName
+      }
+      val properties = new Properties
+      // The database Credentials
+      properties.setProperty("user", "test")
+
+      // STEP 2: Register JDBC driver
+      Class.forName(JDBC_DRIVER)
+      // STEP 3: Open a connection
+      DriverManager.getConnection(DB_URL, properties)
+    }
+
+    /**
+     * convert result set into scala list of map
+     * each map represents a row
+     *
+     * @param queryResult
+     * @return
+     */
+    private def convertResultSetToList(queryResult: ResultSet): List[Map[String, Any]] = {
+      val metadata = queryResult.getMetaData
+      val colNames = (1 to metadata.getColumnCount) map metadata.getColumnName
+      Iterator.continually(buildMapFromQueryResult(queryResult, colNames)).takeWhile(_.isDefined)
+        .map(_.get).toList
+    }
+
+    private def buildMapFromQueryResult(queryResult: ResultSet,
+        colNames: Seq[String]): Option[Map[String, Any]] = {
+      if (queryResult.next()) {
+        Some(colNames.map(name => name -> queryResult.getObject(name)).toMap)
+      }
+      else {
+        None
+      }
+    }
+  }
+
 }
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala
index eaef9c1..0729713 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala
@@ -59,6 +59,11 @@ object Spark2TestQueryExecutor {
     FileFactory.getConfiguration.
       set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER")
   }
+
+  if (System.getProperty("spark.hadoop.hive.metastore.uris") != null) {
+    conf.set("spark.hadoop.hive.metastore.uris",
+      System.getProperty("spark.hadoop.hive.metastore.uris"))
+  }
   val metaStoreDB = s"$integrationPath/spark-common-cluster-test/target"
   val spark = SparkSession
     .builder().config(conf)
diff --git a/pom.xml b/pom.xml
index 454f71d..cfe74c2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -121,6 +121,8 @@
     <dev.path>${basedir}/dev</dev.path>
     <spark.master.url>local[2]</spark.master.url>
     <hdfs.url>local</hdfs.url>
+    <presto.jdbc.url>localhost:8086</presto.jdbc.url>
+    <spark.hadoop.hive.metastore.uris>thrift://localhost:8086</spark.hadoop.hive.metastore.uris>
     <suite.name>org.apache.carbondata.cluster.sdv.suite.SDVSuites</suite.name>
     <script.exetension>.sh</script.exetension>
     <carbon.hive.based.metastore>false</carbon.hive.based.metastore>
diff --git a/processing/pom.xml b/processing/pom.xml
index b90795c..594107f 100644
--- a/processing/pom.xml
+++ b/processing/pom.xml
@@ -101,5 +101,13 @@
       </plugin>
     </plugins>
   </build>
+  <profiles>
+    <profile>
+      <id>sdvtest</id>
+      <properties>
+        <maven.test.skip>true</maven.test.skip>
+      </properties>
+    </profile>
+  </profiles>
 
 </project>
diff --git a/store/sdk/pom.xml b/store/sdk/pom.xml
index f774bb2..ffd8e4e 100644
--- a/store/sdk/pom.xml
+++ b/store/sdk/pom.xml
@@ -90,4 +90,12 @@
       </plugin>
     </plugins>
   </build>
+  <profiles>
+    <profile>
+      <id>sdvtest</id>
+      <properties>
+        <maven.test.skip>true</maven.test.skip>
+      </properties>
+    </profile>
+  </profiles>
 </project>
diff --git a/streaming/pom.xml b/streaming/pom.xml
index 50d7503..aa4e2bc 100644
--- a/streaming/pom.xml
+++ b/streaming/pom.xml
@@ -107,4 +107,12 @@
       </plugin>
     </plugins>
   </build>
+  <profiles>
+    <profile>
+      <id>sdvtest</id>
+      <properties>
+        <maven.test.skip>true</maven.test.skip>
+      </properties>
+    </profile>
+  </profiles>
 </project>
diff --git a/tools/cli/pom.xml b/tools/cli/pom.xml
index 4381ed3..ef8804b 100644
--- a/tools/cli/pom.xml
+++ b/tools/cli/pom.xml
@@ -88,4 +88,12 @@
       </plugin>
     </plugins>
   </build>
+  <profiles>
+    <profile>
+      <id>sdvtest</id>
+      <properties>
+        <maven.test.skip>true</maven.test.skip>
+      </properties>
+    </profile>
+  </profiles>
 </project>


[carbondata] 01/27: [CARBONDATA-3260] Fix the Hive stats issue in carbon catalog table

Posted by ra...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/carbondata.git

commit 01c1ef6d292bc7a0d10eb0297084ab4f92859b1f
Author: ravipesala <ra...@gmail.com>
AuthorDate: Thu Jan 17 19:20:51 2019 +0530

    [CARBONDATA-3260] Fix the Hive stats issue in carbon catalog table
    
    Problem:
    When carbon table is created hive calculate some junk stats keep it
    in hive catalog table. From Spark 2.3 onwards spark considering the
    hive stats to calculate the broadcast join, so it is not working.
    
    Solution:
    Set the hive stats to None in case of carbon table.
    
    This closes #3082
---
 .../apache/spark/util/CarbonReflectionUtils.scala  |  8 +++++++
 .../apache/spark/sql/hive/CarbonSessionUtil.scala  | 17 ++++++++++++--
 streaming/pom.xml                                  | 27 ----------------------
 3 files changed, 23 insertions(+), 29 deletions(-)

diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
index 92f35f6..ee635e0 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
@@ -361,6 +361,14 @@ object CarbonReflectionUtils {
         instanceMirror.reflectField(field.asTerm).set(updatedSerdeMap)
       case _ =>
     }
+  }
 
+  /**
+   * This method updates the field of case class through reflection.
+   */
+  def setFieldToCaseClass(caseObj: Object, fieldName: String, objToSet: Object): Unit = {
+    val nameField = caseObj.getClass.getDeclaredField(fieldName)
+    nameField.setAccessible(true)
+    nameField.set(caseObj, objToSet)
   }
 }
diff --git a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionUtil.scala b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionUtil.scala
index 2accbd6..88a2565 100644
--- a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionUtil.scala
+++ b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionUtil.scala
@@ -70,8 +70,21 @@ object CarbonSessionUtil {
             "tableMeta",
             relation
           ).asInstanceOf[CatalogTable]
-        isRelationRefreshed =
-          CarbonEnv.refreshRelationFromCache(catalogTable.identifier)(sparkSession)
+        catalogTable.provider match {
+          case Some(provider)
+            if provider.equals("org.apache.spark.sql.CarbonSource") ||
+               provider.equalsIgnoreCase("carbondata") =>
+            // Update stats to none in case of carbon table as we are not expecting any stats from
+            // Hive. Hive gives wrong stats for carbon table.
+            catalogTable.stats match {
+              case Some(stats) =>
+                CarbonReflectionUtils.setFieldToCaseClass(catalogTable, "stats", None)
+              case _ =>
+            }
+            isRelationRefreshed =
+              CarbonEnv.refreshRelationFromCache(catalogTable.identifier)(sparkSession)
+          case _ =>
+        }
       case _ =>
     }
     isRelationRefreshed
diff --git a/streaming/pom.xml b/streaming/pom.xml
index 38beb23..50d7503 100644
--- a/streaming/pom.xml
+++ b/streaming/pom.xml
@@ -105,33 +105,6 @@
           <failIfNoTests>false</failIfNoTests>
         </configuration>
       </plugin>
-      <plugin>
-        <groupId>org.scalatest</groupId>
-        <artifactId>scalatest-maven-plugin</artifactId>
-        <version>1.0</version>
-        <!-- Note config is repeated in surefire config -->
-        <configuration>
-          <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
-          <junitxml>.</junitxml>
-          <filereports>CarbonTestSuite.txt</filereports>
-          <argLine> ${argLine} -ea -Xmx3g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m
-          </argLine>
-          <stderr />
-          <environmentVariables>
-          </environmentVariables>
-          <systemProperties>
-            <java.awt.headless>true</java.awt.headless>
-          </systemProperties>
-        </configuration>
-        <executions>
-          <execution>
-            <id>test</id>
-            <goals>
-              <goal>test</goal>
-            </goals>
-          </execution>
-        </executions>
-      </plugin>
     </plugins>
   </build>
 </project>