You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/03/14 04:18:00 UTC

[01/13] carbondata git commit: [Documentation] Updated Readme for Datamap Feature [Forced Update!]

Repository: carbondata
Updated Branches:
  refs/heads/master 030ef947e -> 881ea1e12 (forced update)


[Documentation] Updated Readme for Datamap Feature

Readme is updated with the links to the following new topics on Datamap

This closes #2044


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/78d01145
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/78d01145
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/78d01145

Branch: refs/heads/master
Commit: 78d01145761d7c2199b0f0065592ce25884ac316
Parents: 6ae56b9
Author: sgururajshetty <sg...@gmail.com>
Authored: Thu Mar 8 11:32:17 2018 +0530
Committer: Jacky Li <ja...@qq.com>
Committed: Wed Mar 14 12:09:57 2018 +0800

----------------------------------------------------------------------
 README.md | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/78d01145/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index c466cc7..3c9fe6a 100644
--- a/README.md
+++ b/README.md
@@ -43,6 +43,8 @@ CarbonData is built using Apache Maven, to [build CarbonData](https://github.com
 * [Cluster Installation and Deployment](https://github.com/apache/carbondata/blob/master/docs/installation-guide.md)
 * [Configuring Carbondata](https://github.com/apache/carbondata/blob/master/docs/configuration-parameters.md)
 * [Streaming Ingestion](https://github.com/apache/carbondata/blob/master/docs/streaming-guide.md)
+* [CarbonData Pre-aggregate DataMap](https://github.com/apache/carbondata/blob/master/docs/datamap/preaggregate-datamap-guide.md)
+* [CarbonData Timeseries DataMap](https://github.com/apache/carbondata/blob/master/docs/datamap/timeseries-datamap-guide.md)
 * [FAQ](https://github.com/apache/carbondata/blob/master/docs/faq.md)
 * [Trouble Shooting](https://github.com/apache/carbondata/blob/master/docs/troubleshooting.md)
 * [Useful Tips](https://github.com/apache/carbondata/blob/master/docs/useful-tips-on-carbondata.md)


[10/13] carbondata git commit: [CARBONDATA-2235]Update configuration-parameters.md

Posted by ja...@apache.org.
[CARBONDATA-2235]Update configuration-parameters.md

carbon.query.show.datamaps

This property is a system configuration, if it is set to true, show tables will list all the tables including datamaps(ex: Preaggregate) and if it is false, show tables will filter the datamaps and only will show the main tables.

This closes #2041


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/e9dadfe0
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/e9dadfe0
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/e9dadfe0

Branch: refs/heads/master
Commit: e9dadfe0112f57400583e0820e3912c684b51ef7
Parents: 6772e54
Author: akashrn5 <ak...@gmail.com>
Authored: Wed Mar 7 17:33:10 2018 +0530
Committer: Jacky Li <ja...@qq.com>
Committed: Wed Mar 14 12:11:11 2018 +0800

----------------------------------------------------------------------
 docs/configuration-parameters.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/e9dadfe0/docs/configuration-parameters.md
----------------------------------------------------------------------
diff --git a/docs/configuration-parameters.md b/docs/configuration-parameters.md
index 4a66f2e..c4724cc 100644
--- a/docs/configuration-parameters.md
+++ b/docs/configuration-parameters.md
@@ -38,6 +38,7 @@ This section provides the details of all the configurations required for the Car
 | carbon.data.file.version | V3 | If this parameter value is set to 1, then CarbonData will support the data load which is in old format(0.x version). If the value is set to 2(1.x onwards version), then CarbonData will support the data load of new format only.|
 | carbon.streaming.auto.handoff.enabled | true | If this parameter value is set to true, auto trigger handoff function will be enabled.|
 | carbon.streaming.segment.max.size | 1024000000 | This parameter defines the maximum size of the streaming segment. Setting this parameter to appropriate value will avoid impacting the streaming ingestion. The value is in bytes.|
+| carbon.query.show.datamaps | true | If this parameter value is set to true, show tables command will list all the tables including datatmaps(eg: Preaggregate table), else datamaps will be excluded from the table list. |
 
 ##  Performance Configuration
 This section provides the details of all the configurations required for CarbonData Performance Optimization.


[05/13] carbondata git commit: [HOTFIX] Fix unsafe load in test case

Posted by ja...@apache.org.
[HOTFIX] Fix unsafe load in test case

Unsafe Load fails for dictionary columns because of refactoring

This closes #2051


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/18380a6b
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/18380a6b
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/18380a6b

Branch: refs/heads/master
Commit: 18380a6b4747f6b7a77da057a829426191fae2a9
Parents: 78d0114
Author: ravipesala <ra...@gmail.com>
Authored: Sat Mar 10 20:56:22 2018 +0530
Committer: Jacky Li <ja...@qq.com>
Committed: Wed Mar 14 12:11:09 2018 +0800

----------------------------------------------------------------------
 .../apache/carbondata/core/locks/LocalFileLock.java    | 13 -------------
 pom.xml                                                |  7 +++----
 .../sort/unsafe/comparator/UnsafeRowComparator.java    |  3 +--
 3 files changed, 4 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/18380a6b/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java b/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java
index cb80877..e3b3126 100644
--- a/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java
+++ b/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java
@@ -27,7 +27,6 @@ import java.nio.file.StandardOpenOption;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.util.CarbonUtil;
@@ -136,18 +135,6 @@ public class LocalFileLock extends AbstractCarbonLock {
       status = false;
     } finally {
       CarbonUtil.closeStreams(channel);
-
-      // deleting the lock file after releasing the lock.
-      if (null != lockFilePath) {
-        CarbonFile lockFile = FileFactory.getCarbonFile(lockFilePath,
-            FileFactory.getFileType(lockFilePath));
-        if (!lockFile.exists() || lockFile.delete()) {
-          LOGGER.info("Successfully deleted the lock file " + lockFilePath);
-        } else {
-          LOGGER.error("Not able to delete the lock file " + lockFilePath);
-          status = false;
-        }
-      }
     }
     return status;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/18380a6b/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index fdc9210..972be1e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -99,9 +99,10 @@
     <module>core</module>
     <module>processing</module>
     <module>hadoop</module>
-    <module>store/sdk</module>
     <module>integration/spark-common</module>
     <module>integration/spark-common-test</module>
+    <module>datamap/examples</module>
+    <module>store/sdk</module>
     <module>assembly</module>
   </modules>
 
@@ -266,6 +267,7 @@
         <groupId>org.scalatest</groupId>
         <artifactId>scalatest_${scala.binary.version}</artifactId>
         <version>2.2.1</version>
+        <scope>test</scope>
       </dependency>
     </dependencies>
   </dependencyManagement>
@@ -342,7 +344,6 @@
         <artifactId>findbugs-maven-plugin</artifactId>
         <version>3.0.4</version>
         <configuration>
-          <skip>true</skip>
           <excludeFilterFile>${dev.path}/findbugs-exclude.xml</excludeFilterFile>
           <failOnError>true</failOnError>
           <findbugsXmlOutput>true</findbugsXmlOutput>
@@ -444,7 +445,6 @@
         <module>format</module>
         <module>integration/spark2</module>
         <module>examples/spark2</module>
-        <module>datamap/examples</module>
         <module>integration/hive</module>
         <module>integration/presto</module>
         <module>examples/flink</module>
@@ -481,7 +481,6 @@
         <module>integration/presto</module>
         <module>streaming</module>
         <module>examples/spark2</module>
-        <module>datamap/examples</module>
         <module>datamap/lucene</module>
       </modules>
       <build>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/18380a6b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparator.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparator.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparator.java
index 33342dc..8f29cee 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparator.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparator.java
@@ -33,8 +33,7 @@ public class UnsafeRowComparator implements Comparator<UnsafeCarbonRow> {
   public UnsafeRowComparator(UnsafeCarbonRowPage rowPage) {
     this.baseObject = rowPage.getDataBlock().getBaseObject();
     this.tableFieldStat = rowPage.getTableFieldStat();
-    this.dictSizeInMemory = (tableFieldStat.getDictSortDimCnt()
-        + tableFieldStat.getDictNoSortDimCnt()) * 4;
+    this.dictSizeInMemory = tableFieldStat.getDictSortDimCnt() * 4;
   }
 
   /**


[09/13] carbondata git commit: [CARBONDATA-2241][Docs][BugFix] Updated Doc for query which will execute on datamap

Posted by ja...@apache.org.
[CARBONDATA-2241][Docs][BugFix] Updated Doc for query which will execute on datamap

Fix: Corrected the query so that it will execute using datamap.

This closes #2048


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/6772e547
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/6772e547
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/6772e547

Branch: refs/heads/master
Commit: 6772e547ce0f90ff735d439d4f76cf407fa41979
Parents: 4119062
Author: SangeetaGulia <sa...@knoldus.in>
Authored: Fri Mar 9 11:13:45 2018 +0530
Committer: Jacky Li <ja...@qq.com>
Committed: Wed Mar 14 12:11:11 2018 +0800

----------------------------------------------------------------------
 docs/datamap/preaggregate-datamap-guide.md | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/6772e547/docs/datamap/preaggregate-datamap-guide.md
----------------------------------------------------------------------
diff --git a/docs/datamap/preaggregate-datamap-guide.md b/docs/datamap/preaggregate-datamap-guide.md
index 199f674..84f06c4 100644
--- a/docs/datamap/preaggregate-datamap-guide.md
+++ b/docs/datamap/preaggregate-datamap-guide.md
@@ -181,7 +181,7 @@ SELECT country, sex, sum(quantity), avg(price) from sales GROUP BY country, sex
 
 SELECT sex, sum(quantity) from sales GROUP BY sex
 
-SELECT sum(price), country from sales GROUP BY country
+SELECT avg(price), country from sales GROUP BY country
 ``` 
 
 will be transformed by CarbonData's query planner to query against pre-aggregate table 
@@ -211,7 +211,7 @@ tables are loaded successfully, if one of these loads fails, new data are not vi
 as if the load operation is not happened.   
 
 ## Querying data
-As a technique for query acceleration, Pre-aggregate tables cannot be queries directly. 
+As a technique for query acceleration, Pre-aggregate tables cannot be queried directly. 
 Queries are to be made on main table. While doing query planning, internally CarbonData will check 
 associated pre-aggregate tables with the main table, and do query plan transformation accordingly. 
 


[12/13] carbondata git commit: [CARBONDATA-2194] Exception is improper when use incorrect bad record action type

Posted by ja...@apache.org.
[CARBONDATA-2194] Exception is improper when use incorrect bad record action type

This closes #1989


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/5ab30995
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/5ab30995
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/5ab30995

Branch: refs/heads/master
Commit: 5ab30995f85dcde080033a63aab257367e8b36a4
Parents: 334a420
Author: xubo245 <60...@qq.com>
Authored: Thu Feb 22 16:31:12 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Wed Mar 14 12:11:11 2018 +0800

----------------------------------------------------------------------
 .../apache/carbondata/core/util/SessionParams.java |  2 +-
 .../badrecordloger/BadRecordLoggerTest.scala       |  2 +-
 .../StandardPartitionBadRecordLoggerTest.scala     | 17 +++++++++++++++++
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala    |  2 +-
 4 files changed, 20 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/5ab30995/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
index c232b1e..68d0daa 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
@@ -157,7 +157,7 @@ public class SessionParams implements Serializable {
           isValid = true;
         } catch (IllegalArgumentException iae) {
           throw new InvalidConfigurationException(
-              "The key " + key + " can have only either FORCE or IGNORE or REDIRECT.");
+              "The key " + key + " can have only either FORCE or IGNORE or REDIRECT or FAIL.");
         }
         break;
       case CARBON_OPTIONS_SORT_SCOPE:

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5ab30995/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordLoggerTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordLoggerTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordLoggerTest.scala
index 694d25b..b6ba0e0 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordLoggerTest.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordLoggerTest.scala
@@ -242,7 +242,7 @@ class BadRecordLoggerTest extends QueryTest with BeforeAndAfterAll {
           + "('bad_records_action'='FORCA', 'DELIMITER'= ',', 'QUOTECHAR'= '\"')");
     } catch {
       case ex: Exception =>
-        assert("option BAD_RECORDS_ACTION can have only either FORCE or IGNORE or REDIRECT"
+        assert("option BAD_RECORDS_ACTION can have only either FORCE or IGNORE or REDIRECT or FAIL"
           .equals(ex.getMessage))
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5ab30995/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionBadRecordLoggerTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionBadRecordLoggerTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionBadRecordLoggerTest.scala
index 7b93766..8e1f13b 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionBadRecordLoggerTest.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionBadRecordLoggerTest.scala
@@ -204,6 +204,23 @@ class StandardPartitionBadRecordLoggerTest extends QueryTest with BeforeAndAfter
     )
   }
 
+  test("test load ddl command") {
+    sql(
+      """CREATE TABLE IF NOT EXISTS dataloadOptionTests(ID BigInt, date Timestamp, country
+           String,
+          actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata'
+      """)
+    val csvFilePath = s"$resourcesPath/badrecords/emptyTimeStampValue.csv"
+    try {
+      sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE dataloadOptionTests OPTIONS"
+        + "('bad_records_action'='FORCA', 'DELIMITER'= ',', 'QUOTECHAR'= '\"')");
+    } catch {
+      case ex: Exception =>
+        assert("option BAD_RECORDS_ACTION can have only either FORCE or IGNORE or REDIRECT or FAIL"
+          .equals(ex.getMessage))
+    }
+  }
+
   def drop(): Unit = {
     sql("drop table IF EXISTS sales")
     sql("drop table IF EXISTS serializable_values")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5ab30995/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index 54ac7dc..a377790 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -950,7 +950,7 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
       catch {
         case _: IllegalArgumentException =>
           throw new MalformedCarbonCommandException(
-            "option BAD_RECORDS_ACTION can have only either FORCE or IGNORE or REDIRECT")
+            "option BAD_RECORDS_ACTION can have only either FORCE or IGNORE or REDIRECT or FAIL")
       }
     }
     if (options.exists(_._1.equalsIgnoreCase("IS_EMPTY_DATA_BAD_RECORD"))) {


[06/13] carbondata git commit: [CARBONDATA-2032][DataLoad] directly write carbon data files to HDFS

Posted by ja...@apache.org.
[CARBONDATA-2032][DataLoad] directly write carbon data files to HDFS

Currently in data loading, carbondata write the final data files to local disk and then copy it to HDFS.
For saving disk IO, carbondata can skip this procedure and directly write these files to HDFS.

This closes #1825


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/41190629
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/41190629
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/41190629

Branch: refs/heads/master
Commit: 41190629e76f104447ee0320f8b2a69a40054369
Parents: 07d4da7
Author: xuchuanyin <xu...@hust.edu.cn>
Authored: Fri Mar 9 09:50:55 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Wed Mar 14 12:11:11 2018 +0800

----------------------------------------------------------------------
 .../constants/CarbonLoadOptionConstants.java    |   5 +
 .../filesystem/AbstractDFSCarbonFile.java       |  39 +++-
 .../core/datastore/filesystem/CarbonFile.java   |  29 +++
 .../datastore/filesystem/LocalCarbonFile.java   |  26 ++-
 .../core/datastore/impl/FileFactory.java        |  41 ++++
 .../apache/carbondata/core/util/CarbonUtil.java |  27 +++
 .../hadoop/test/util/StoreCreator.java          |  24 --
 .../dataload/TestLoadDataGeneral.scala          |  20 +-
 .../store/writer/AbstractFactDataWriter.java    | 230 ++++++++++---------
 .../writer/v3/CarbonFactDataWriterImplV3.java   |  31 ++-
 .../processing/util/CarbonLoaderUtil.java       |  11 +-
 .../carbondata/processing/StoreCreator.java     |  23 --
 12 files changed, 322 insertions(+), 184 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/41190629/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
index 8ff8dc4..823f568 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
@@ -134,4 +134,9 @@ public final class CarbonLoadOptionConstants {
    * row delimiter for each sort column bounds
    */
   public static final String SORT_COLUMN_BOUNDS_ROW_DELIMITER = ";";
+
+  @CarbonProperty
+  public static final String ENABLE_CARBON_LOAD_DIRECT_WRITE_HDFS
+      = "carbon.load.directWriteHdfs.enabled";
+  public static final String ENABLE_CARBON_LOAD_DIRECT_WRITE_HDFS_DEFAULT = "false";
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/41190629/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
index fd5dc40..8cf3efe 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
@@ -357,7 +357,8 @@ public abstract class AbstractDFSCarbonFile implements CarbonFile {
     }
   }
 
-  @Override public DataOutputStream getDataOutputStream(String path, FileFactory.FileType fileType)
+  @Override
+  public DataOutputStream getDataOutputStream(String path, FileFactory.FileType fileType)
       throws IOException {
     path = path.replace("\\", "/");
     Path pt = new Path(path);
@@ -365,15 +366,26 @@ public abstract class AbstractDFSCarbonFile implements CarbonFile {
     return fs.create(pt, true);
   }
 
-  @Override public DataOutputStream getDataOutputStream(String path, FileFactory.FileType fileType,
+  @Override
+  public DataOutputStream getDataOutputStream(String path, FileFactory.FileType fileType,
       int bufferSize, long blockSize) throws IOException {
     path = path.replace("\\", "/");
     Path pt = new Path(path);
+    short replication = pt.getFileSystem(FileFactory.getConfiguration()).getDefaultReplication(pt);
+    return getDataOutputStream(path, fileType, bufferSize, blockSize, replication);
+  }
+
+  @Override
+  public DataOutputStream getDataOutputStream(String path, FileFactory.FileType fileType,
+      int bufferSize, long blockSize, short replication) throws IOException {
+    path = path.replace("\\", "/");
+    Path pt = new Path(path);
     FileSystem fs = pt.getFileSystem(FileFactory.getConfiguration());
-    return fs.create(pt, true, bufferSize, fs.getDefaultReplication(pt), blockSize);
+    return fs.create(pt, true, bufferSize, replication, blockSize);
   }
 
-  @Override public DataOutputStream getDataOutputStream(String path, FileFactory.FileType fileType,
+  @Override
+  public DataOutputStream getDataOutputStream(String path, FileFactory.FileType fileType,
       int bufferSize, String compressor) throws IOException {
     path = path.replace("\\", "/");
     Path pt = new Path(path);
@@ -518,7 +530,8 @@ public abstract class AbstractDFSCarbonFile implements CarbonFile {
    */
   protected abstract CarbonFile[] getFiles(FileStatus[] listStatus);
 
-  @Override public String[] getLocations() throws IOException {
+  @Override
+  public String[] getLocations() throws IOException {
     BlockLocation[] blkLocations;
     if (fileStatus instanceof LocatedFileStatus) {
       blkLocations = ((LocatedFileStatus)fileStatus).getBlockLocations();
@@ -529,4 +542,20 @@ public abstract class AbstractDFSCarbonFile implements CarbonFile {
 
     return blkLocations[0].getHosts();
   }
+
+  @Override
+  public boolean setReplication(String filePath, short replication) throws IOException {
+    filePath = filePath.replace("\\", "/");
+    Path path = new Path(filePath);
+    FileSystem fs = path.getFileSystem(FileFactory.getConfiguration());
+    return fs.setReplication(path, replication);
+  }
+
+  @Override
+  public short getDefaultReplication(String filePath) throws IOException {
+    filePath = filePath.replace("\\", "/");
+    Path path = new Path(filePath);
+    FileSystem fs = path.getFileSystem(FileFactory.getConfiguration());
+    return fs.getDefaultReplication(path);
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/41190629/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java
index 80c0510..a1d6672 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java
@@ -106,6 +106,18 @@ public interface CarbonFile {
 
   /**
    * get data output stream
+   * @param path file path
+   * @param fileType file type
+   * @param bufferSize write buffer size
+   * @param blockSize block size
+   * @param replication replication for this file
+   * @return data output stream
+   * @throws IOException if error occurs
+   */
+  DataOutputStream getDataOutputStream(String path, FileFactory.FileType fileType, int bufferSize,
+      long blockSize, short replication) throws IOException;
+  /**
+   * get data output stream
    * @param path
    * @param fileType
    * @param bufferSize
@@ -141,4 +153,21 @@ public interface CarbonFile {
    */
   String[] getLocations() throws IOException;
 
+  /**
+   * set the replication factor for this file
+   *
+   * @param filePath file path
+   * @param replication replication
+   * @return true, if success; false, if failed
+   * @throws IOException if error occurs
+   */
+  boolean setReplication(String filePath, short replication) throws IOException;
+
+  /**
+   * get the default replication for this file
+   * @param filePath file path
+   * @return replication factor
+   * @throws IOException if error occurs
+   */
+  short getDefaultReplication(String filePath) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/41190629/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
index 24022ad..d4ed2b6 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
@@ -309,21 +309,30 @@ public class LocalCarbonFile implements CarbonFile {
     return new DataInputStream(new BufferedInputStream(fis));
   }
 
-  @Override public DataOutputStream getDataOutputStream(String path, FileFactory.FileType fileType)
+  @Override
+  public DataOutputStream getDataOutputStream(String path, FileFactory.FileType fileType)
       throws IOException {
     path = path.replace("\\", "/");
     path = FileFactory.getUpdatedFilePath(path, FileFactory.FileType.LOCAL);
     return new DataOutputStream(new BufferedOutputStream(new FileOutputStream(path)));
   }
 
-  @Override public DataOutputStream getDataOutputStream(String path, FileFactory.FileType fileType,
+  @Override
+  public DataOutputStream getDataOutputStream(String path, FileFactory.FileType fileType,
       int bufferSize, long blockSize) throws IOException {
+    return getDataOutputStream(path, fileType, bufferSize, blockSize, (short) 1);
+  }
+
+  @Override
+  public DataOutputStream getDataOutputStream(String path, FileFactory.FileType fileType,
+      int bufferSize, long blockSize, short replication) throws IOException {
     path = path.replace("\\", "/");
     path = FileFactory.getUpdatedFilePath(path, fileType);
     return new DataOutputStream(new BufferedOutputStream(new FileOutputStream(path), bufferSize));
   }
 
-  @Override public DataOutputStream getDataOutputStream(String path, FileFactory.FileType fileType,
+  @Override
+  public DataOutputStream getDataOutputStream(String path, FileFactory.FileType fileType,
       int bufferSize, String compressor) throws IOException {
     path = path.replace("\\", "/");
     path = FileFactory.getUpdatedFilePath(path, fileType);
@@ -426,4 +435,15 @@ public class LocalCarbonFile implements CarbonFile {
   @Override public String[] getLocations() throws IOException {
     return new String[]{"localhost"};
   }
+
+  @Override
+  public boolean setReplication(String filePath, short replication) throws IOException {
+    // local carbon file does not need replication
+    return true;
+  }
+
+  @Override
+  public short getDefaultReplication(String filePath) throws IOException {
+    return 1;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/41190629/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
index 9bcdfae..ef84fb3 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
@@ -151,6 +151,21 @@ public final class FileFactory {
   }
 
   /**
+   * get data output stream
+   * @param path file path
+   * @param fileType file type
+   * @param bufferSize write buffer size
+   * @param blockSize block size
+   * @param replication replication
+   * @return data output stream
+   * @throws IOException if error occurs
+   */
+  public static DataOutputStream getDataOutputStream(String path, FileType fileType, int bufferSize,
+      long blockSize, short replication) throws IOException {
+    return getCarbonFile(path).getDataOutputStream(path, fileType, bufferSize, blockSize,
+        replication);
+  }
+  /**
    * get data out put stream
    * @param path
    * @param fileType
@@ -457,4 +472,30 @@ public final class FileFactory {
     }
   }
 
+  /**
+   * set the file replication
+   *
+   * @param path file path
+   * @param fileType file type
+   * @param replication replication
+   * @return true, if success; false, if failed
+   * @throws IOException if error occurs
+   */
+  public static boolean setReplication(String path, FileFactory.FileType fileType,
+      short replication) throws IOException {
+    return getCarbonFile(path, fileType).setReplication(path, replication);
+  }
+
+  /**
+   * get the default replication
+   *
+   * @param path file path
+   * @param fileType file type
+   * @return replication
+   * @throws IOException if error occurs
+   */
+  public static short getDefaultReplication(String path, FileFactory.FileType fileType)
+      throws IOException {
+    return getCarbonFile(path, fileType).getDefaultReplication(path);
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/41190629/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index bed6aaa..b961b60 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -2448,6 +2448,33 @@ public final class CarbonUtil {
   }
 
   /**
+   * This method will complete the remaining hdfs replications
+   *
+   * @param fileName hdfs file name
+   * @param fileType filetype
+   * @throws CarbonDataWriterException if error occurs
+   */
+  public static void completeRemainingHdfsReplicas(String fileName, FileFactory.FileType fileType)
+    throws CarbonDataWriterException {
+    try {
+      long startTime = System.currentTimeMillis();
+      short replication = FileFactory.getDefaultReplication(fileName, fileType);
+      if (1 == replication) {
+        return;
+      }
+      boolean replicateFlag = FileFactory.setReplication(fileName, fileType, replication);
+      if (!replicateFlag) {
+        LOGGER.error("Failed to set replication for " + fileName + " with factor " + replication);
+      }
+      LOGGER.info(
+          "Total copy time (ms) to copy file " + fileName + " is " + (System.currentTimeMillis()
+              - startTime));
+    } catch (IOException e) {
+      throw new CarbonDataWriterException("Problem while completing remaining HDFS backups", e);
+    }
+  }
+
+  /**
    * This method will read the local carbon data file and write to carbon data file in HDFS
    *
    * @param carbonStoreFilePath

http://git-wip-us.apache.org/repos/asf/carbondata/blob/41190629/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
index 1fc0508..8e8916d 100644
--- a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
+++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
@@ -431,30 +431,6 @@ public class StoreCreator {
 
     writeLoadMetadata(loadModel.getCarbonDataLoadSchema(), loadModel.getTableName(), loadModel.getTableName(),
         new ArrayList<LoadMetadataDetails>());
-
-    String segLocation =
-        storeLocation + "/" + databaseName + "/" + tableName + "/Fact/Part0/Segment_0";
-    File file = new File(segLocation);
-    File factFile = null;
-    File[] folderList = file.listFiles();
-    File folder = null;
-    for (int i = 0; i < folderList.length; i++) {
-      if (folderList[i].isDirectory()) {
-        folder = folderList[i];
-      }
-    }
-    if (folder.isDirectory()) {
-      File[] files = folder.listFiles();
-      for (int i = 0; i < files.length; i++) {
-        if (!files[i].isDirectory() && files[i].getName().startsWith("part")) {
-          factFile = files[i];
-          break;
-        }
-      }
-      //      Files.copy(factFile.toPath(), file.toPath(), REPLACE_EXISTING);
-      factFile.renameTo(new File(segLocation + "/" + factFile.getName()));
-      CarbonUtil.deleteFoldersAndFiles(folder);
-    }
   }
 
   public static void writeLoadMetadata(CarbonDataLoadSchema schema, String databaseName,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/41190629/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
index e5075ef..43b215e 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
@@ -29,7 +29,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.CarbonMetadata
 import org.apache.spark.sql.test.util.QueryTest
 
-import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
 import org.apache.carbondata.core.util.CarbonProperties
 
 class TestLoadDataGeneral extends QueryTest with BeforeAndAfterEach {
@@ -242,6 +242,24 @@ class TestLoadDataGeneral extends QueryTest with BeforeAndAfterEach {
     checkAnswer(sql("select * from stale"), Row("k"))
   }
 
+  test("test data loading with directly writing fact data to hdfs") {
+    val originStatus = CarbonProperties.getInstance().getProperty(
+      CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_HDFS,
+      CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_HDFS_DEFAULT)
+    CarbonProperties.getInstance().addProperty(
+      CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_HDFS, "true")
+
+    val testData = s"$resourcesPath/sample.csv"
+    sql(s"LOAD DATA LOCAL INPATH '$testData' into table loadtest")
+    checkAnswer(
+      sql("SELECT COUNT(*) FROM loadtest"),
+      Seq(Row(6))
+    )
+
+    CarbonProperties.getInstance().addProperty(
+      CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_HDFS,
+      originStatus)
+  }
   override def afterEach {
     sql("DROP TABLE if exists loadtest")
     sql("drop table if exists invalidMeasures")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/41190629/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
index 4064c0d..5783fe5 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
@@ -17,15 +17,12 @@
 
 package org.apache.carbondata.processing.store.writer;
 
+import java.io.DataOutputStream;
 import java.io.File;
-import java.io.FileFilter;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
 import java.io.IOException;
-import java.nio.channels.FileChannel;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Comparator;
 import java.util.List;
 import java.util.Random;
 import java.util.concurrent.Callable;
@@ -38,7 +35,9 @@ import java.util.concurrent.TimeUnit;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
 import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.keygenerator.mdkey.NumberCompressor;
 import org.apache.carbondata.core.metadata.converter.SchemaConverter;
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
@@ -65,13 +64,18 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
       LogServiceFactory.getLogService(AbstractFactDataWriter.class.getName());
 
   /**
-   * file channel
+   * file channel to write
    */
-  protected FileChannel fileChannel;
+  protected WritableByteChannel fileChannel;
+  protected long currentOffsetInFile;
+  /**
+   * The path of CarbonData file to write in hdfs
+   */
+  private String carbonDataFileHdfsPath;
   /**
    * The temp path of carbonData file used on executor
    */
-  protected String carbonDataFileTempPath;
+  private String carbonDataFileTempPath;
 
   /**
    * The name of carbonData file (blockId)
@@ -125,7 +129,7 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
    */
   private long currentFileSize;
 
-  protected FileOutputStream fileOutputStream;
+  protected DataOutputStream fileOutputStream;
 
   protected List<BlockIndexInfo> blockIndexInfoList;
 
@@ -143,6 +147,10 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
    * listener to write data map
    */
   protected DataMapWriterListener listener;
+  /**
+   * Whether directly write fact data to hdfs
+   */
+  private boolean enableDirectlyWriteData2Hdfs = false;
 
   public AbstractFactDataWriter(CarbonFactDataHandlerModel model) {
     this.model = model;
@@ -163,8 +171,19 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
     LOGGER.info("Total file size: " + fileSizeInBytes + " and dataBlock Size: " +
         blockSizeThreshold);
 
+    // whether to directly write fact data to HDFS
+    String directlyWriteData2Hdfs = propInstance.getProperty(
+        CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_HDFS,
+        CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_HDFS_DEFAULT);
+    this.enableDirectlyWriteData2Hdfs = "TRUE".equalsIgnoreCase(directlyWriteData2Hdfs);
+    if (enableDirectlyWriteData2Hdfs) {
+      LOGGER.info("Carbondata will directly write fact data to HDFS.");
+    } else {
+      LOGGER.info("Carbondata will write temporary fact data to local disk.");
+    }
+
     this.executorService = Executors.newFixedThreadPool(1,
-        new CarbonThreadFactory("LocalToHDFSCopyPool:" + this.model.getTableName()));
+        new CarbonThreadFactory("CompleteHDFSBackendPool:" + this.model.getTableName()));
     executorServiceSubmitList = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
     // in case of compaction we will pass the cardinality.
     this.localCardinality = this.model.getColCardinality();
@@ -210,10 +229,12 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
       throws CarbonDataWriterException {
     if ((currentFileSize + blockletSizeToBeAdded) >= blockSizeThreshold && currentFileSize != 0) {
       // set the current file size to zero
+      String activeFile =
+          enableDirectlyWriteData2Hdfs ? carbonDataFileHdfsPath : carbonDataFileTempPath;
       LOGGER.info("Writing data to file as max file size reached for file: "
-          + carbonDataFileTempPath + " .Data block size: " + currentFileSize);
+          + activeFile + ". Data block size: " + currentFileSize);
       // write meta data to end of the existing file
-      writeBlockletInfoToFile(fileChannel, carbonDataFileTempPath);
+      writeBlockletInfoToFile();
       this.currentFileSize = 0;
       this.dataChunksOffsets = new ArrayList<>();
       this.dataChunksLength = new ArrayList<>();
@@ -229,7 +250,7 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
   private void notifyDataMapBlockStart() {
     if (listener != null) {
       try {
-        listener.onBlockStart(carbonDataFileName, constructFactFileFullPath());
+        listener.onBlockStart(carbonDataFileName, carbonDataFileHdfsPath);
       } catch (IOException e) {
         throw new CarbonDataWriterException("Problem while writing datamap", e);
       }
@@ -247,11 +268,6 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
     blockletId = 0;
   }
 
-  private String constructFactFileFullPath() {
-    String factFilePath =
-        this.model.getCarbonDataDirectoryPath() + File.separator + this.carbonDataFileName;
-    return factFilePath;
-  }
   /**
    * Finish writing current file. It will flush stream, copy and rename temp file to final file
    * @param copyInCurrentThread set to false if want to do data copy in a new thread
@@ -259,12 +275,23 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
   protected void commitCurrentFile(boolean copyInCurrentThread) {
     notifyDataMapBlockEnd();
     CarbonUtil.closeStreams(this.fileOutputStream, this.fileChannel);
-    if (copyInCurrentThread) {
-      CarbonUtil.copyCarbonDataFileToCarbonStorePath(
-          carbonDataFileTempPath, model.getCarbonDataDirectoryPath(),
-          fileSizeInBytes);
+    if (enableDirectlyWriteData2Hdfs) {
+      if (copyInCurrentThread) {
+        CarbonUtil.completeRemainingHdfsReplicas(carbonDataFileHdfsPath,
+            FileFactory.FileType.HDFS);
+      } else {
+        executorServiceSubmitList.add(executorService.submit(
+            new CompleteHdfsBackendThread(carbonDataFileHdfsPath, FileFactory.FileType.HDFS)));
+      }
     } else {
-      executorServiceSubmitList.add(executorService.submit(new CopyThread(carbonDataFileTempPath)));
+      if (copyInCurrentThread) {
+        CarbonUtil.copyCarbonDataFileToCarbonStorePath(carbonDataFileTempPath,
+            model.getCarbonDataDirectoryPath(),
+            fileSizeInBytes);
+      } else {
+        executorServiceSubmitList.add(executorService.submit(
+            new CompleteHdfsBackendThread(carbonDataFileTempPath, FileFactory.FileType.LOCAL)));
+      }
     }
   }
 
@@ -274,79 +301,46 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
    * @throws CarbonDataWriterException
    */
   public void initializeWriter() throws CarbonDataWriterException {
-    // update the filename with new new sequence
-    // increment the file sequence counter
-    initFileCount();
-
-    //each time we initialize writer, we choose a local temp location randomly
-    String[] tempFileLocations = model.getStoreLocation();
-    String chosenTempLocation = tempFileLocations[new Random().nextInt(tempFileLocations.length)];
-    LOGGER.info("Randomly choose factdata temp location: " + chosenTempLocation);
-
     this.carbonDataFileName = CarbonTablePath
         .getCarbonDataFileName(fileCount, model.getCarbonDataFileAttributes().getTaskId(),
             model.getBucketId(), model.getTaskExtension(),
             "" + model.getCarbonDataFileAttributes().getFactTimeStamp());
-    this.carbonDataFileTempPath = chosenTempLocation + File.separator + carbonDataFileName;
-    this.fileCount++;
+    this.carbonDataFileHdfsPath = model.getCarbonDataDirectoryPath() + File.separator
+        + carbonDataFileName;
     try {
-      // open channel for new data file
-      fileOutputStream = new FileOutputStream(this.carbonDataFileTempPath, true);
-      this.fileChannel = fileOutputStream.getChannel();
-    } catch (FileNotFoundException fileNotFoundException) {
-      throw new CarbonDataWriterException("Problem while getting the FileChannel for Leaf File",
-          fileNotFoundException);
-    }
-    notifyDataMapBlockStart();
-  }
-
-  private int initFileCount() {
-    int fileInitialCount = 0;
-    FileFilter fileFilter = new FileFilter() {
-      @Override public boolean accept(File pathVal) {
-        if (!pathVal.isDirectory() && pathVal.getName().startsWith(model.getTableName())
-            && pathVal.getName().contains(CarbonCommonConstants.FACT_FILE_EXT)) {
-          return true;
-        }
-        return false;
-      }
-    };
-
-    List<File> dataFileList = new ArrayList<File>();
-    for (String tempLoc : model.getStoreLocation()) {
-      File[] subFiles = new File(tempLoc).listFiles(fileFilter);
-      if (null != subFiles && subFiles.length > 0) {
-        dataFileList.addAll(Arrays.asList(subFiles));
+      if (enableDirectlyWriteData2Hdfs) {
+        // the block size will be twice the block_size specified by user to make sure that
+        // one carbondata file only consists exactly one HDFS block.
+        // Here we write the first replication and will complete the remaining later.
+        fileOutputStream = FileFactory.getDataOutputStream(carbonDataFileHdfsPath,
+            FileFactory.FileType.HDFS, CarbonCommonConstants.BYTEBUFFER_SIZE, fileSizeInBytes * 2,
+            (short) 1);
+      } else {
+        //each time we initialize writer, we choose a local temp location randomly
+        String[] tempFileLocations = model.getStoreLocation();
+        String chosenTempLocation =
+            tempFileLocations[new Random().nextInt(tempFileLocations.length)];
+        LOGGER.info("Randomly choose factdata temp location: " + chosenTempLocation);
+        carbonDataFileTempPath = chosenTempLocation + File.separator + carbonDataFileName;
+        fileOutputStream = FileFactory.getDataOutputStream(carbonDataFileTempPath,
+            FileFactory.FileType.LOCAL, CarbonCommonConstants.BYTEBUFFER_SIZE, true);
       }
-    }
 
-    File[] dataFiles = new File[dataFileList.size()];
-    dataFileList.toArray(dataFiles);
-    if (dataFiles != null && dataFiles.length > 0) {
-      // since files are in different directory, we should only compare the file name
-      // and ignore the directory
-      Arrays.sort(dataFiles, new Comparator<File>() {
-        @Override public int compare(File o1, File o2) {
-          return o1.getName().compareTo(o2.getName());
-        }
-      });
-      String dataFileName = dataFiles[dataFiles.length - 1].getName();
-      try {
-        fileInitialCount = Integer
-            .parseInt(dataFileName.substring(dataFileName.lastIndexOf('_') + 1).split("\\.")[0]);
-      } catch (NumberFormatException ex) {
-        fileInitialCount = 0;
-      }
-      fileInitialCount++;
+      this.fileCount++;
+      // open channel for new data file
+      this.fileChannel = Channels.newChannel(fileOutputStream);
+      this.currentOffsetInFile = 0;
+    } catch (IOException ex) {
+      throw new CarbonDataWriterException(
+          "Problem while getting the channel for fact data file", ex);
     }
-    return fileInitialCount;
+    notifyDataMapBlockStart();
   }
 
   /**
    * This method will write metadata at the end of file file format in thrift format
    */
-  protected abstract void writeBlockletInfoToFile(
-      FileChannel channel, String filePath) throws CarbonDataWriterException;
+  protected abstract void writeBlockletInfoToFile() throws CarbonDataWriterException;
 
   /**
    * Below method will be used to fill the vlock info details
@@ -395,18 +389,27 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
             model.getSchemaUpdatedTimeStamp());
     // get the block index info thrift
     List<BlockIndex> blockIndexThrift = CarbonMetadataUtil.getBlockIndexInfo(blockIndexInfoList);
-    // randomly choose a temp location for index file
-    String[] tempLocations = model.getStoreLocation();
-    String chosenTempLocation = tempLocations[new Random().nextInt(tempLocations.length)];
-    LOGGER.info("Randomly choose index file location: " + chosenTempLocation);
+    String indexFileName;
+    if (enableDirectlyWriteData2Hdfs) {
+      String rawFileName = model.getCarbonDataDirectoryPath() + File.separator + CarbonTablePath
+          .getCarbonIndexFileName(model.getCarbonDataFileAttributes().getTaskId(),
+              model.getBucketId(), model.getTaskExtension(),
+              "" + model.getCarbonDataFileAttributes().getFactTimeStamp());
+      indexFileName = FileFactory.getUpdatedFilePath(rawFileName, FileFactory.FileType.HDFS);
+    } else {
+      // randomly choose a temp location for index file
+      String[] tempLocations = model.getStoreLocation();
+      String chosenTempLocation = tempLocations[new Random().nextInt(tempLocations.length)];
+      LOGGER.info("Randomly choose index file location: " + chosenTempLocation);
+      indexFileName = chosenTempLocation + File.separator + CarbonTablePath
+          .getCarbonIndexFileName(model.getCarbonDataFileAttributes().getTaskId(),
+              model.getBucketId(), model.getTaskExtension(),
+              "" + model.getCarbonDataFileAttributes().getFactTimeStamp());
+    }
 
-    String fileName = chosenTempLocation + File.separator + CarbonTablePath
-        .getCarbonIndexFileName(model.getCarbonDataFileAttributes().getTaskId(),
-            model.getBucketId(), model.getTaskExtension(),
-            "" + model.getCarbonDataFileAttributes().getFactTimeStamp());
     CarbonIndexFileWriter writer = new CarbonIndexFileWriter();
     // open file
-    writer.openThriftWriter(fileName);
+    writer.openThriftWriter(indexFileName);
     // write the header first
     writer.writeThrift(indexHeader);
     // write the indexes
@@ -414,10 +417,14 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
       writer.writeThrift(blockIndex);
     }
     writer.close();
-    // copy from temp to actual store location
-    CarbonUtil.copyCarbonDataFileToCarbonStorePath(fileName,
-            model.getCarbonDataDirectoryPath(),
-            fileSizeInBytes);
+    if (enableDirectlyWriteData2Hdfs) {
+      executorServiceSubmitList.add(executorService.submit(
+          new CompleteHdfsBackendThread(indexFileName, FileFactory.FileType.HDFS)));
+    } else {
+      CarbonUtil.copyCarbonDataFileToCarbonStorePath(indexFileName,
+          model.getCarbonDataDirectoryPath(),
+          fileSizeInBytes);
+    }
   }
 
   /**
@@ -435,27 +442,29 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
         executorServiceSubmitList.get(i).get();
       }
     } catch (InterruptedException | ExecutionException | IOException e) {
-      LOGGER.error(e, "Error while finishing writer");
-      throw new CarbonDataWriterException(e.getMessage());
+      throw new CarbonDataWriterException(e);
     }
   }
 
 
 
   /**
-   * This method will copy the carbon data file from local store location to
-   * carbon store location
+   * This method will complete hdfs backend storage for this file.
+   * It may copy the carbon data file from local store location to carbon store location,
+   * it may also complete the remaining replications for the existing hdfs file.
    */
-  private final class CopyThread implements Callable<Void> {
+  private final class CompleteHdfsBackendThread implements Callable<Void> {
 
     /**
      * complete path along with file name which needs to be copied to
      * carbon store path
      */
     private String fileName;
+    private FileFactory.FileType fileType;
 
-    private CopyThread(String fileName) {
+    private CompleteHdfsBackendThread(String fileName, FileFactory.FileType fileType) {
       this.fileName = fileName;
+      this.fileType = fileType;
     }
 
     /**
@@ -464,13 +473,16 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
      * @return computed result
      * @throws Exception if unable to compute a result
      */
-    @Override public Void call() throws Exception {
-      CarbonUtil.copyCarbonDataFileToCarbonStorePath(
-          fileName,
-          model.getCarbonDataDirectoryPath(),
-          fileSizeInBytes);
+    @Override
+    public Void call() throws Exception {
+      if (FileFactory.FileType.HDFS == fileType) {
+        CarbonUtil.completeRemainingHdfsReplicas(fileName, fileType);
+      } else {
+        CarbonUtil.copyCarbonDataFileToCarbonStorePath(fileName,
+            model.getCarbonDataDirectoryPath(),
+            fileSizeInBytes);
+      }
       return null;
     }
-
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/41190629/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
index 1c9ccc8..3e9be7e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
@@ -18,7 +18,6 @@ package org.apache.carbondata.processing.store.writer.v3;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -80,11 +79,11 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter {
     blockletDataHolder = new BlockletDataHolder();
   }
 
-  @Override protected void writeBlockletInfoToFile(FileChannel channel, String filePath)
+  @Override protected void writeBlockletInfoToFile()
       throws CarbonDataWriterException {
     try {
       // get the current file position
-      long currentPosition = channel.size();
+      long currentPosition = currentOffsetInFile;
       // get thrift file footer instance
       FileFooter3 convertFileMeta = CarbonMetadataUtil
           .convertFileFooterVersion3(blockletMetadata, blockletIndex, localCardinality,
@@ -98,7 +97,7 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter {
       buffer.put(byteArray);
       buffer.putLong(currentPosition);
       buffer.flip();
-      channel.write(buffer);
+      currentOffsetInFile += fileChannel.write(buffer);
     } catch (IOException e) {
       LOGGER.error(e, "Problem while writing the carbon file");
       throw new CarbonDataWriterException("Problem while writing the carbon file: ", e);
@@ -178,11 +177,11 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter {
 
     // write data to file
     try {
-      if (fileChannel.size() == 0) {
+      if (currentOffsetInFile == 0) {
         // write the header if file is empty
-        writeHeaderToFile(fileChannel);
+        writeHeaderToFile();
       }
-      writeBlockletToFile(fileChannel, dataChunkBytes);
+      writeBlockletToFile(dataChunkBytes);
       if (listener != null) {
         listener.onBlockletEnd(blockletId++);
       }
@@ -227,12 +226,12 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter {
   /**
    * write file header
    */
-  private void writeHeaderToFile(FileChannel channel) throws IOException {
+  private void writeHeaderToFile() throws IOException {
     byte[] fileHeader = CarbonUtil.getByteArray(
         CarbonMetadataUtil.getFileHeader(
             true, thriftColumnSchemaList, model.getSchemaUpdatedTimeStamp()));
     ByteBuffer buffer = ByteBuffer.wrap(fileHeader);
-    channel.write(buffer);
+    currentOffsetInFile += fileChannel.write(buffer);
   }
 
   /**
@@ -243,9 +242,9 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter {
    * <Column3 Data ChunkV3><Column3<Page1><Page2><Page3><Page4>>
    * <Column4 Data ChunkV3><Column4<Page1><Page2><Page3><Page4>>
    */
-  private void writeBlockletToFile(FileChannel channel, byte[][] dataChunkBytes)
+  private void writeBlockletToFile(byte[][] dataChunkBytes)
       throws IOException {
-    long offset = channel.size();
+    long offset = currentOffsetInFile;
     // to maintain the offset of each data chunk in blocklet
     List<Long> currentDataChunksOffset = new ArrayList<>();
     // to maintain the length of each data chunk in blocklet
@@ -265,13 +264,13 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter {
       currentDataChunksOffset.add(offset);
       currentDataChunksLength.add(dataChunkBytes[i].length);
       buffer = ByteBuffer.wrap(dataChunkBytes[i]);
-      channel.write(buffer);
+      currentOffsetInFile += fileChannel.write(buffer);
       offset += dataChunkBytes[i].length;
       for (EncodedTablePage encodedTablePage : encodedTablePages) {
         EncodedColumnPage dimension = encodedTablePage.getDimension(i);
         buffer = dimension.getEncodedData();
         int bufferSize = buffer.limit();
-        channel.write(buffer);
+        currentOffsetInFile += fileChannel.write(buffer);
         offset += bufferSize;
       }
     }
@@ -281,14 +280,14 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter {
       currentDataChunksOffset.add(offset);
       currentDataChunksLength.add(dataChunkBytes[dataChunkStartIndex].length);
       buffer = ByteBuffer.wrap(dataChunkBytes[dataChunkStartIndex]);
-      channel.write(buffer);
+      currentOffsetInFile += fileChannel.write(buffer);
       offset += dataChunkBytes[dataChunkStartIndex].length;
       dataChunkStartIndex++;
       for (EncodedTablePage encodedTablePage : encodedTablePages) {
         EncodedColumnPage measure = encodedTablePage.getMeasure(i);
         buffer = measure.getEncodedData();
         int bufferSize = buffer.limit();
-        channel.write(buffer);
+        currentOffsetInFile += fileChannel.write(buffer);
         offset += bufferSize;
       }
     }
@@ -360,7 +359,7 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter {
 
   @Override public void writeFooterToFile() throws CarbonDataWriterException {
     if (this.blockletMetadata.size() > 0) {
-      writeBlockletInfoToFile(fileChannel, carbonDataFileTempPath);
+      writeBlockletInfoToFile();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/41190629/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index a948538..922a7ee 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -63,6 +63,7 @@ import org.apache.carbondata.processing.merger.NodeMultiBlockRelation;
 import static org.apache.carbondata.core.enums.EscapeSequences.*;
 
 import com.google.gson.Gson;
+import org.apache.commons.lang3.StringUtils;
 
 public final class CarbonLoaderUtil {
 
@@ -957,9 +958,13 @@ public final class CarbonLoaderUtil {
             infos.add(block);
             nodeCapacity++;
             if (LOGGER.isDebugEnabled()) {
-              LOGGER.debug(
-                  "First Assignment iteration: " + ((TableBlockInfo) block).getFilePath() + '-'
-                      + ((TableBlockInfo) block).getBlockLength() + "-->" + activeExecutor);
+              try {
+                LOGGER.debug("First Assignment iteration: block("
+                    + StringUtils.join(block.getLocations(), ", ")
+                    + ")-->" + activeExecutor);
+              } catch (IOException e) {
+                LOGGER.error(e);
+              }
             }
             remainingBlocks.remove(block);
           } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/41190629/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
----------------------------------------------------------------------
diff --git a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
index e79f003..aae6f03 100644
--- a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
+++ b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
@@ -405,29 +405,6 @@ public class StoreCreator {
 
     writeLoadMetadata(loadModel.getCarbonDataLoadSchema(), loadModel.getTableName(), loadModel.getTableName(),
         new ArrayList<LoadMetadataDetails>());
-
-    String segLocation =
-        storeLocation + "/" + databaseName + "/" + tableName + "/Fact/Part0/Segment_0";
-    File file = new File(segLocation);
-    File factFile = null;
-    File[] folderList = file.listFiles();
-    File folder = null;
-    for (int i = 0; i < folderList.length; i++) {
-      if (folderList[i].isDirectory()) {
-        folder = folderList[i];
-      }
-    }
-    if (folder.isDirectory()) {
-      File[] files = folder.listFiles();
-      for (int i = 0; i < files.length; i++) {
-        if (!files[i].isDirectory() && files[i].getName().startsWith("part")) {
-          factFile = files[i];
-          break;
-        }
-      }
-      factFile.renameTo(new File(segLocation + "/" + factFile.getName()));
-      CarbonUtil.deleteFoldersAndFiles(folder);
-    }
   }
 
   public static void writeLoadMetadata(CarbonDataLoadSchema schema, String databaseName,


[04/13] carbondata git commit: [CARBONDATA-2222] Update the FAQ doc for some mistakes

Posted by ja...@apache.org.
[CARBONDATA-2222] Update the FAQ doc for some mistakes

Update the FAQ doc for some mistakes

This closes #2029


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/6ae56b92
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/6ae56b92
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/6ae56b92

Branch: refs/heads/master
Commit: 6ae56b92dc1aa5be3e3ea6b48400cbf6a065d3dc
Parents: a161841
Author: chenerlu <ch...@huawei.com>
Authored: Sun Mar 4 23:39:40 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Wed Mar 14 12:09:57 2018 +0800

----------------------------------------------------------------------
 docs/faq.md | 28 ++++++++++++++--------------
 1 file changed, 14 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/6ae56b92/docs/faq.md
----------------------------------------------------------------------
diff --git a/docs/faq.md b/docs/faq.md
index 8f04e4f..b5f8254 100644
--- a/docs/faq.md
+++ b/docs/faq.md
@@ -25,7 +25,7 @@
 * [What is Carbon Lock Type?](#what-is-carbon-lock-type)
 * [How to resolve Abstract Method Error?](#how-to-resolve-abstract-method-error)
 * [How Carbon will behave when execute insert operation in abnormal scenarios?](#how-carbon-will-behave-when-execute-insert-operation-in-abnormal-scenarios)
-* [Why aggregate query is not fetching data from aggregate table?] (#why-aggregate-query-is-not-fetching-data-from-aggregate-table)
+* [Why aggregate query is not fetching data from aggregate table?](#why-aggregate-query-is-not-fetching-data-from-aggregate-table)
 
 ## What are Bad Records?
 Records that fail to get loaded into the CarbonData due to data type incompatibility or are empty or have incompatible format are classified as Bad Records.
@@ -143,38 +143,38 @@ INSERT INTO TABLE carbon_table SELECT id, city FROM source_table;
 When the column type in carbon table is different from the column specified in select statement. The insert operation will still success, but you may get NULL in result, because NULL will be substitute value when conversion type failed.
 
 ## Why aggregate query is not fetching data from aggregate table?
-Following are the aggregate queries that won’t fetch data from aggregate table:
+Following are the aggregate queries that won't fetch data from aggregate table:
 
 - **Scenario 1** :
 When SubQuery predicate is present in the query.
 
-Example 
+Example:
 
 ```
-create table gdp21(cntry smallint, gdp double, y_year date) stored by 'carbondata'
-create datamap ag1 on table gdp21 using 'preaggregate' as select cntry, sum(gdp) from gdp group by ctry;
-select ctry from pop1 where ctry in (select cntry from gdp21 group by cntry)
+create table gdp21(cntry smallint, gdp double, y_year date) stored by 'carbondata';
+create datamap ag1 on table gdp21 using 'preaggregate' as select cntry, sum(gdp) from gdp21 group by cntry;
+select ctry from pop1 where ctry in (select cntry from gdp21 group by cntry);
 ```
 
 - **Scenario 2** : 
-When aggregate function along with ‘in’ filter. 
+When aggregate function along with 'in' filter.
 
-Example.
+Example:
 
 ```
-create table gdp21(cntry smallint, gdp double, y_year date) stored by 'carbondata'
-create datamap ag1 on table gdp21 using 'preaggregate' as select cntry, sum(gdp) from gdp group by ctry;
+create table gdp21(cntry smallint, gdp double, y_year date) stored by 'carbondata';
+create datamap ag1 on table gdp21 using 'preaggregate' as select cntry, sum(gdp) from gdp21 group by cntry;
 select cntry, sum(gdp) from gdp21 where cntry in (select ctry from pop1) group by cntry;
 ```
 
 - **Scenario 3** : 
-When aggregate function having ‘join’ with Equal filter.
+When aggregate function having 'join' with equal filter.
 
-Example.
+Example:
 
 ```
-create table gdp21(cntry smallint, gdp double, y_year date) stored by 'carbondata'
-create datamap ag1 on table gdp21 using 'preaggregate' as select cntry, sum(gdp) from gdp group by ctry;
+create table gdp21(cntry smallint, gdp double, y_year date) stored by 'carbondata';
+create datamap ag1 on table gdp21 using 'preaggregate' as select cntry, sum(gdp) from gdp21 group by cntry;
 select cntry,sum(gdp) from gdp21,pop1 where cntry=ctry group by cntry;
 ```
 


[07/13] carbondata git commit: [CARBONDATA-2288] [Test] Exception is Masked Inside StandardPartitionTableQueryTestCase

Posted by ja...@apache.org.
[CARBONDATA-2288] [Test] Exception is Masked Inside StandardPartitionTableQueryTestCase

This closes #2034


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/334a420a
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/334a420a
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/334a420a

Branch: refs/heads/master
Commit: 334a420ad59dca166f11945092daf9d7de154fe8
Parents: 76135d8
Author: anubhav100 <an...@knoldus.in>
Authored: Tue Mar 6 12:08:48 2018 +0530
Committer: Jacky Li <ja...@qq.com>
Committed: Wed Mar 14 12:11:11 2018 +0800

----------------------------------------------------------------------
 .../StandardPartitionTableQueryTestCase.scala   | 23 +++++++++++++++++++-
 1 file changed, 22 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/334a420a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
index 163e662..4cce7d5 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
@@ -17,6 +17,7 @@
 package org.apache.carbondata.spark.testsuite.standardpartition
 
 import org.apache.spark.sql.execution.BatchedDataSourceScanExec
+import org.apache.spark.sql.test.Spark2TestQueryExecutor
 import org.apache.spark.sql.test.util.QueryTest
 import org.apache.spark.sql.{DataFrame, Row}
 import org.scalatest.BeforeAndAfterAll
@@ -244,11 +245,31 @@ class StandardPartitionTableQueryTestCase extends QueryTest with BeforeAndAfterA
   }
 
 test("Creation of partition table should fail if the colname in table schema and partition column is same even if both are case sensitive"){
-  intercept[Exception]{
+
+  val exception = intercept[Exception]{
     sql("CREATE TABLE uniqdata_char2(name char,id int) partitioned by (NAME char)stored by 'carbondata' ")
   }
+  if(Spark2TestQueryExecutor.spark.version.startsWith("2.1.0")){
+    assert(exception.getMessage.contains("Operation not allowed: Partition columns should not be " +
+                                         "specified in the schema: [\"name\"]"))
+  }
+  else{
+    //spark 2.2 allow creating char data type only with digits thats why this assert is here as it will throw this exception
+    assert(exception.getMessage.contains("DataType char is not supported"))
+
+  }
 }
 
+  test("Creation of partition table should fail for both spark version with same exception when char data type is created with specified digit and colname in table schema and partition column is same even if both are case sensitive"){
+
+    sql("DROP TABLE IF EXISTS UNIQDATA_CHAR2")
+    val exception = intercept[Exception]{
+      sql("CREATE TABLE uniqdata_char2(name char(10),id int) partitioned by (NAME char(10))stored by 'carbondata' ")
+    }
+    assert(exception.getMessage.contains("Operation not allowed: Partition columns should not be " +
+                                         "specified in the schema: [\"name\"]"))
+  }
+
   test("Renaming a partition table should fail"){
     sql("drop table if exists partitionTable")
     sql(


[11/13] carbondata git commit: [CARBONDATA-2226] Removed redundant and unnecessary test cases to improve CI time for PreAggregation Create and Drop datamap feature

Posted by ja...@apache.org.
[CARBONDATA-2226] Removed redundant and unnecessary test cases to improve CI time for PreAggregation Create and Drop datamap feature

Description: Removed redundant and unnecessary test cases to improve CI time for PreAggregation Create and Drop datamap feature

This closes #2035


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/76135d87
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/76135d87
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/76135d87

Branch: refs/heads/master
Commit: 76135d8784ec7324412a1e2e5178792d519adbb5
Parents: e9dadfe
Author: SangeetaGulia <sa...@knoldus.in>
Authored: Tue Mar 6 11:03:38 2018 +0530
Committer: Jacky Li <ja...@qq.com>
Committed: Wed Mar 14 12:11:11 2018 +0800

----------------------------------------------------------------------
 .../preaggregate/TestPreAggCreateCommand.scala  | 101 ++++++-------------
 .../preaggregate/TestPreAggregateDrop.scala     |  10 +-
 2 files changed, 35 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/76135d87/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
index 50b8bec..cd87913 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
@@ -39,18 +39,14 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll {
     sql("drop database if exists otherDB cascade")
     sql("drop table if exists PreAggMain")
     sql("drop table if exists PreAggMain1")
-    sql("drop table if exists PreAggMain2")
     sql("drop table if exists maintable")
     sql("drop table if exists showTables")
     sql("drop table if exists Preagg_twodb")
     sql("create table preaggMain (a string, b string, c string) stored by 'carbondata'")
     sql("create table preaggMain1 (a string, b string, c string) stored by 'carbondata' tblProperties('DICTIONARY_INCLUDE' = 'a')")
-    sql("create table preaggMain2 (a string, b string, c string) stored by 'carbondata'")
     sql("create table maintable (column1 int, column6 string, column5 string, column2 string, column3 int, column4 int) stored by 'carbondata' tblproperties('dictionary_include'='column1,column6', 'dictionary_exclude'='column3,column5')")
-
   }
 
-
   test("test pre agg create table 1") {
     sql("create datamap preagg1 on table PreAggMain using 'preaggregate' as select a,sum(b) from PreAggMain group by a")
     checkExistence(sql("DESCRIBE FORMATTED PreAggMain_preagg1"), true, "preaggmain_a")
@@ -59,27 +55,12 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll {
   }
 
   test("test pre agg create table 2") {
-    sql("create datamap preagg2 on table PreAggMain using 'preaggregate' as select a as a1,sum(b) from PreAggMain group by a")
+    sql("create datamap preagg2 on table PreAggMain using 'preaggregate' as select a as a1,sum(b) as udfsum from PreAggMain group by a")
     checkExistence(sql("DESCRIBE FORMATTED PreAggMain_preagg2"), true, "preaggmain_a")
     checkExistence(sql("DESCRIBE FORMATTED PreAggMain_preagg2"), true, "preaggmain_b_sum")
     sql("drop datamap preagg2 on table PreAggMain")
   }
 
-  test("test pre agg create table 3") {
-    sql("create datamap preagg3 on table PreAggMain using 'preaggregate' as select a,sum(b) as sum from PreAggMain group by a")
-    checkExistence(sql("DESCRIBE FORMATTED PreAggMain_preagg3"), true, "preaggmain_a")
-    checkExistence(sql("DESCRIBE FORMATTED PreAggMain_preagg3"), true, "preaggmain_b_sum")
-    sql("drop datamap preagg3 on table PreAggMain")
-  }
-
-  test("test pre agg create table 4") {
-    sql("create datamap preagg4 on table PreAggMain using 'preaggregate' as select a as a1,sum(b) as sum from PreAggMain group by a")
-    checkExistence(sql("DESCRIBE FORMATTED PreAggMain_preagg4"), true, "preaggmain_a")
-    checkExistence(sql("DESCRIBE FORMATTED PreAggMain_preagg4"), true, "preaggmain_b_sum")
-    sql("drop datamap preagg4 on table PreAggMain")
-  }
-
-
   test("test pre agg create table 5") {
     sql("create datamap preagg11 on table PreAggMain1 using 'preaggregate'as select a,sum(b) from PreAggMain1 group by a")
     checkExistence(sql("DESCRIBE FORMATTED PreAggMain1_preagg11"), true, "preaggmain1_a")
@@ -88,22 +69,6 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll {
     sql("drop datamap preagg11 on table PreAggMain1")
   }
 
-  test("test pre agg create table 6") {
-    sql("create datamap preagg12 on table PreAggMain1 using 'preaggregate' as select a as a1,sum(b) from PreAggMain1 group by a")
-    checkExistence(sql("DESCRIBE FORMATTED PreAggMain1_preagg12"), true, "preaggmain1_a")
-    checkExistence(sql("DESCRIBE FORMATTED PreAggMain1_preagg12"), true, "preaggmain1_b_sum")
-    checkExistence(sql("DESCRIBE FORMATTED PreAggMain1_preagg12"), true, "DICTIONARY")
-    sql("drop datamap preagg12 on table PreAggMain1")
-  }
-
-  test("test pre agg create table 7") {
-    sql("create datamap preagg13 on table PreAggMain1 using 'preaggregate' as select a,sum(b) as sum from PreAggMain1 group by a")
-    checkExistence(sql("DESCRIBE FORMATTED PreAggMain1_preagg13"), true, "preaggmain1_a")
-    checkExistence(sql("DESCRIBE FORMATTED PreAggMain1_preagg13"), true, "preaggmain1_b_sum")
-    checkExistence(sql("DESCRIBE FORMATTED PreAggMain1_preagg13"), true, "DICTIONARY")
-    sql("drop datamap preagg13 on table PreAggMain1")
-  }
-
   test("test pre agg create table 8") {
     sql("create datamap preagg14 on table PreAggMain1 using 'preaggregate' as select a as a1,sum(b) as sum from PreAggMain1 group by a")
     checkExistence(sql("DESCRIBE FORMATTED PreAggMain1_preagg14"), true, "preaggmain1_a")
@@ -112,68 +77,69 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll {
     sql("drop datamap preagg14 on table PreAggMain1")
   }
 
-
   test("test pre agg create table 9") {
-    sql("create datamap preagg15 on table PreAggMain2 using 'preaggregate' as select a,avg(b) from PreAggMain2 group by a")
-    checkExistence(sql("DESCRIBE FORMATTED PreAggMain2_preagg15"), true, "preaggmain2_a")
-    checkExistence(sql("DESCRIBE FORMATTED PreAggMain2_preagg15"), true, "preaggmain2_b_sum")
-    checkExistence(sql("DESCRIBE FORMATTED PreAggMain2_preagg15"), true, "preaggmain2_b_count")
-    sql("drop datamap preagg15 on table PreAggMain2")
+    sql("create datamap preagg15 on table PreAggMain using 'preaggregate' as select a,avg(b) from PreAggMain group by a")
+    checkExistence(sql("DESCRIBE FORMATTED PreAggMain_preagg15"), true, "preaggmain_a")
+    checkExistence(sql("DESCRIBE FORMATTED PreAggMain_preagg15"), true, "preaggmain_b_sum")
+    checkExistence(sql("DESCRIBE FORMATTED PreAggMain_preagg15"), true, "preaggmain_b_count")
+    sql("drop datamap preagg15 on table PreAggMain")
   }
 
   test("test pre agg create table 10") {
-    sql("create datamap preagg16 on table PreAggMain2 using 'preaggregate' as select a as a1,max(b) from PreAggMain2 group by a")
-    checkExistence(sql("DESCRIBE FORMATTED PreAggMain2_preagg16"), true, "preaggmain2_a")
-    checkExistence(sql("DESCRIBE FORMATTED PreAggMain2_preagg16"), true, "preaggmain2_b_max")
-    sql("drop datamap preagg16 on table PreAggMain2")
+    sql("create datamap preagg16 on table PreAggMain using 'preaggregate' as select a as a1,max(b) from PreAggMain group by a")
+    checkExistence(sql("DESCRIBE FORMATTED PreAggMain_preagg16"), true, "preaggmain_a")
+    checkExistence(sql("DESCRIBE FORMATTED PreAggMain_preagg16"), true, "preaggmain_b_max")
+    sql("drop datamap preagg16 on table PreAggMain")
   }
 
   test("test pre agg create table 11") {
-    sql("create datamap preagg17 on table PreAggMain2 using 'preaggregate' as select a,min(b) from PreAggMain2 group by a")
-    checkExistence(sql("DESCRIBE FORMATTED PreAggMain2_preagg17"), true, "preaggmain2_a")
-    checkExistence(sql("DESCRIBE FORMATTED PreAggMain2_preagg17"), true, "preaggmain2_b_min")
-    sql("drop datamap preagg17 on table PreAggMain2")
+    sql("create datamap preagg17 on table PreAggMain using 'preaggregate' as select a,min(b) from PreAggMain group by a")
+    checkExistence(sql("DESCRIBE FORMATTED PreAggMain_preagg17"), true, "preaggmain_a")
+    checkExistence(sql("DESCRIBE FORMATTED PreAggMain_preagg17"), true, "preaggmain_b_min")
+    sql("drop datamap preagg17 on table PreAggMain")
   }
 
   test("test pre agg create table 12") {
-    sql("create datamap preagg18 on table PreAggMain2 using 'preaggregate' as select a as a1,count(b) from PreAggMain2 group by a")
-    checkExistence(sql("DESCRIBE FORMATTED PreAggMain2_preagg18"), true, "preaggmain2_a")
-    checkExistence(sql("DESCRIBE FORMATTED PreAggMain2_preagg18"), true, "preaggmain2_b_count")
-    sql("drop datamap preagg18 on table PreAggMain2")
+    sql("create datamap preagg18 on table PreAggMain using 'preaggregate' as select a as a1,count(b) from PreAggMain group by a")
+    checkExistence(sql("DESCRIBE FORMATTED PreAggMain_preagg18"), true, "preaggmain_a")
+    checkExistence(sql("DESCRIBE FORMATTED PreAggMain_preagg18"), true, "preaggmain_b_count")
+    sql("drop datamap preagg18 on table PreAggMain")
   }
 
   test("test pre agg create table 13") {
-    intercept[Exception] {
+    val exception: Exception = intercept[MalformedCarbonCommandException] {
       sql(
         s"""
-           | create datamap preagg19 on table PreAggMain2
+           | create datamap preagg19 on table PreAggMain
            | using 'preaggregate'
            | as select a as a1,count(distinct b)
-           | from PreAggMain2 group by a
+           | from PreAggMain group by a
          """.stripMargin)
     }
+    assert(exception.getMessage.equals("Distinct is not supported On Pre Aggregation"))
   }
 
   test("test pre agg create table 14") {
-    intercept[Exception] {
+    val exception = intercept[MalformedCarbonCommandException] {
       sql(
         s"""
-           | create datamap preagg20 on table PreAggMain2
+           | create datamap preagg20 on table PreAggMain
            | using 'preaggregate'
-           | as select a as a1,sum(distinct b) from PreAggMain2
+           | as select a as a1,sum(distinct b) from PreAggMain
            | group by a
          """.stripMargin)
     }
+    assert(exception.getMessage.equals("Distinct is not supported On Pre Aggregation"))
   }
 
   test("test pre agg create table 15") {
     intercept[Exception] {
       sql(
         s"""
-           | create datamap preagg21 on table PreAggMain2
+           | create datamap preagg21 on table PreAggMain
            | using 'preaggregate'
            | as select a as a1,sum(b)
-           | from PreAggMain2
+           | from PreAggMain
            | where a='vishal'
            | group by a
          """.stripMargin)
@@ -342,15 +308,13 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll {
   }
 
 
-  test("test show tables filterted with datamaps") {
+  test("test show tables filtered with datamaps") {
     CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SHOW_DATAMAPS,"false")
-    sql("create table showTables(name string, age int) stored by 'carbondata'")
-    sql(
-      "create datamap preAgg on table showTables using 'preaggregate' as select sum(age) from showTables")
+    sql("create datamap preagg1 on table PreAggMain using 'preaggregate' as select a,sum(b) from PreAggMain group by a")
     sql("show tables").show()
-    checkExistence(sql("show tables"), false, "showtables_preagg")
+    checkExistence(sql("show tables"), false, "preaggmain_preagg1")
     CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SHOW_DATAMAPS,CarbonCommonConstants.CARBON_SHOW_DATAMAPS_DEFAULT)
-    checkExistence(sql("show tables"), true, "showtables_preagg")
+    checkExistence(sql("show tables"), true, "preaggmain_preagg1")
   }
 
   test("test create main and preagg table of same name in two database") {
@@ -401,7 +365,6 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll {
     sql("drop table if exists maintable")
     sql("drop table if exists PreAggMain")
     sql("drop table if exists PreAggMain1")
-    sql("drop table if exists PreAggMain2")
     sql("drop table if exists maintabletime")
     sql("drop table if exists showTables")
     sql("drop table if exists Preagg_twodb")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/76135d87/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala
index 2c7c593..a96a19d 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala
@@ -45,9 +45,8 @@ class TestPreAggregateDrop extends QueryTest with BeforeAndAfterAll {
       "create datamap preagg2 on table maintable using 'preaggregate' as select" +
       " a,sum(c) from maintable group by a")
     sql("drop datamap if exists preagg2 on table maintable")
-    val showTables = sql("show tables")
     val showdatamaps =sql("show datamap on table maintable")
-    checkExistence(showTables, false, "maintable_preagg2")
+    checkExistence(showdatamaps, false, "maintable_preagg2")
     checkExistence(showdatamaps, true, "maintable_preagg1")
   }
 
@@ -85,14 +84,11 @@ class TestPreAggregateDrop extends QueryTest with BeforeAndAfterAll {
     checkExistence(showTables, false, "maintable_preagg_same1")
     sql("create datamap preagg_same1 on table maintable using 'preaggregate' as select" +
         " a,sum(c) from maintable group by a")
-    showTables = sql("show tables")
-    val showdatamaps =sql("show datamap on table maintable")
-    checkExistence(showdatamaps, true, "maintable_preagg_same1")
+    val showDatamaps =sql("show datamap on table maintable")
+    checkExistence(showDatamaps, true, "maintable_preagg_same1")
     sql("drop datamap preagg_same1 on table maintable")
   }
 
-
-
   test("drop main table and check if preaggreagte is deleted") {
     sql(
       "create datamap preagg2 on table maintable using 'preaggregate' as select" +


[13/13] carbondata git commit: [CARBONDATA-2139] Optimize CTAS documentation and test case

Posted by ja...@apache.org.
[CARBONDATA-2139] Optimize CTAS documentation and test case

Optimize CTAS:

optimize documentation
add test case
drop table after finishing run test acse, remove the file of table from disk

This closes #1939


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/881ea1e1
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/881ea1e1
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/881ea1e1

Branch: refs/heads/master
Commit: 881ea1e12756909ef174111e7878621749b39560
Parents: 5ab3099
Author: xubo245 <60...@qq.com>
Authored: Wed Feb 7 12:16:41 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Wed Mar 14 12:11:12 2018 +0800

----------------------------------------------------------------------
 .gitignore                                      |   2 +-
 docs/data-management-on-carbondata.md           |  46 ++-
 .../generated/CreateTableAsSelectTestCase.scala |  45 +++
 .../createTable/TestCreateTableAsSelect.scala   | 280 +++++++++++++++++--
 4 files changed, 345 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/881ea1e1/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 2753645..5d66a40 100644
--- a/.gitignore
+++ b/.gitignore
@@ -15,4 +15,4 @@ target/
 .project
 .classpath
 metastore_db/
-derby.log
\ No newline at end of file
+derby.log

http://git-wip-us.apache.org/repos/asf/carbondata/blob/881ea1e1/docs/data-management-on-carbondata.md
----------------------------------------------------------------------
diff --git a/docs/data-management-on-carbondata.md b/docs/data-management-on-carbondata.md
index cb9a17c..bd4afdc 100644
--- a/docs/data-management-on-carbondata.md
+++ b/docs/data-management-on-carbondata.md
@@ -139,6 +139,41 @@ This tutorial is going to introduce all commands and data operations on CarbonDa
                    'SORT_SCOPE'='NO_SORT')
    ```
 
+## CREATE TABLE AS SELECT
+  This function allows user to create a Carbon table from any of the Parquet/Hive/Carbon table. This is beneficial when the user wants to create Carbon table from any other Parquet/Hive table and use the Carbon query engine to query and achieve better query results for cases where Carbon is faster than other file formats. Also this feature can be used for backing up the data.
+### Syntax
+  ```
+  CREATE TABLE [IF NOT EXISTS] [db_name.]table_name 
+  STORED BY 'carbondata' 
+  [TBLPROPERTIES (key1=val1, key2=val2, ...)] 
+  AS select_statement;
+  ```
+
+### Examples
+  ```
+  carbon.sql("CREATE TABLE source_table(
+                             id INT,
+                             name STRING,
+                             city STRING,
+                             age INT)
+              STORED AS parquet")
+  carbon.sql("INSERT INTO source_table SELECT 1,'bob','shenzhen',27")
+  carbon.sql("INSERT INTO source_table SELECT 2,'david','shenzhen',31")
+  
+  carbon.sql("CREATE TABLE target_table
+              STORED BY 'carbondata'
+              AS SELECT city,avg(age) FROM source_table GROUP BY city")
+              
+  carbon.sql("SELECT * FROM target_table").show
+    // results:
+    //    +--------+--------+
+    //    |    city|avg(age)|
+    //    +--------+--------+
+    //    |shenzhen|    29.0|
+    //    +--------+--------+
+
+  ```
+
 ## CREATE DATABASE 
   This function creates a new database. By default the database is created in Carbon store location, but you can also specify custom location.
   ```
@@ -150,17 +185,6 @@ This tutorial is going to introduce all commands and data operations on CarbonDa
   CREATE DATABASE carbon LOCATION “hdfs://name_cluster/dir1/carbonstore”;
   ```
 
-## CREATE TABLE As SELECT
-  This function allows you to create a Carbon table from any of the Parquet/Hive/Carbon table. This is beneficial when the user wants to create Carbon table from any other Parquet/Hive table and use the Carbon query engine to query and achieve better query results for cases where Carbon is faster than other file formats. Also this feature can be used for backing up the data.
-  ```
-  CREATE TABLE [IF NOT EXISTS] [db_name.]table_name STORED BY 'carbondata' [TBLPROPERTIES (key1=val1, key2=val2, ...)] AS select_statement;
-  ```
-
-### Examples
-  ```
-  CREATE TABLE ctas_select_parquet STORED BY 'carbondata' as select * from parquet_ctas_test;
-  ```
-   
 ## TABLE MANAGEMENT  
 
 ### SHOW TABLE

http://git-wip-us.apache.org/repos/asf/carbondata/blob/881ea1e1/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/CreateTableAsSelectTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/CreateTableAsSelectTestCase.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/CreateTableAsSelectTestCase.scala
index aa8c404..0e52f85 100644
--- a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/CreateTableAsSelectTestCase.scala
+++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/CreateTableAsSelectTestCase.scala
@@ -132,6 +132,34 @@ class CreateTableAsSelectTestCase extends QueryTest with BeforeAndAfterAll {
     sql("DROP TABLE IF EXISTS ctas_select_hugedata2").collect
   }
 
+  //Check create table as select with where clause in select from parquet table that does not return data
+  test("CreateTableAsSelect_001_14", Include) {
+    sql("DROP TABLE IF EXISTS ctas_select_where_parquet").collect
+    sql(
+      """
+        | CREATE TABLE ctas_select_where_parquet
+        | STORED BY 'carbondata'
+        | AS SELECT * FROM parquet_ctas_test
+        | WHERE key=300
+      """.stripMargin).collect
+    checkAnswer(sql("SELECT * FROM ctas_select_where_parquet"),
+      sql("SELECT * FROM parquet_ctas_test where key=300"))
+  }
+
+  //Check create table as select with where clause in select from hive/orc table that does not return data
+  test("CreateTableAsSelect_001_15", Include) {
+    sql("DROP TABLE IF EXISTS ctas_select_where_orc").collect
+    sql(
+      """
+        | CREATE TABLE ctas_select_where_orc
+        | STORED BY 'carbondata'
+        | AS SELECT * FROM orc_ctas_test
+        | WHERE key=100
+      """.stripMargin).collect
+    checkAnswer(sql("SELECT * FROM ctas_select_where_orc"), sql("SELECT * FROM orc_ctas_test WHERE key=100"))
+  }
+
+
   override protected def beforeAll() {
    // Dropping existing tables
    sql("DROP TABLE IF EXISTS carbon_ctas_test")
@@ -153,4 +181,21 @@ class CreateTableAsSelectTestCase extends QueryTest with BeforeAndAfterAll {
    sql("insert into orc_ctas_test select 100,'spark'")
    sql("insert into orc_ctas_test select 200,'hive'")
   }
+
+  override protected def afterAll(): Unit = {
+    sql("DROP TABLE IF EXISTS carbon_ctas_test")
+    sql("DROP TABLE IF EXISTS parquet_ctas_test")
+    sql("DROP TABLE IF EXISTS orc_ctas_test")
+    sql("DROP TABLE IF EXISTS ctas_same_table_name")
+    sql("DROP TABLE IF EXISTS ctas_select_carbon")
+    sql("DROP TABLE IF EXISTS ctas_select_direct_data")
+    sql("DROP TABLE IF EXISTS ctas_select_parquet")
+    sql("DROP TABLE IF EXISTS ctas_select_orc")
+    sql("DROP TABLE IF EXISTS ctas_select_where_carbon")
+    sql("DROP TABLE IF EXISTS ctas_select_where_parquet")
+    sql("DROP TABLE IF EXISTS ctas_select_where_orc")
+    sql("DROP TABLE IF EXISTS ctas_select_direct_data")
+    sql("DROP TABLE IF EXISTS ctas_select_hugedata1")
+    sql("DROP TABLE IF EXISTS ctas_select_hugedata2")
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/881ea1e1/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableAsSelect.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableAsSelect.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableAsSelect.scala
index 8315848..062e5ba 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableAsSelect.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableAsSelect.scala
@@ -17,14 +17,17 @@
 
 package org.apache.carbondata.spark.testsuite.createTable
 
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row}
+import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
 import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.sql.test.Spark2TestQueryExecutor
-import org.apache.spark.sql.{CarbonEnv, Row}
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
 import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.util.CarbonProperties
 
 /**
  * test functionality for create table as select command
@@ -33,7 +36,7 @@ class TestCreateTableAsSelect extends QueryTest with BeforeAndAfterAll {
 
   private def createTablesAndInsertData {
     // create carbon table and insert data
-    sql("CREATE TABLE carbon_ctas_test(key INT, value STRING) STORED by 'carbondata'")
+    sql("CREATE TABLE carbon_ctas_test(key INT, value STRING) STORED BY 'carbondata'")
     sql("insert into carbon_ctas_test select 100,'spark'")
     sql("insert into carbon_ctas_test select 200,'hive'")
 
@@ -53,14 +56,30 @@ class TestCreateTableAsSelect extends QueryTest with BeforeAndAfterAll {
     sql("DROP TABLE IF EXISTS parquet_ctas_test")
     sql("DROP TABLE IF EXISTS orc_ctas_test")
     createTablesAndInsertData
+    CarbonProperties.getInstance().
+      addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD,
+        CarbonCommonConstants.DEFAULT_SEGMENT_LEVEL_THRESHOLD)
   }
 
-  test("test create table as select with select from same table name when table exists") {
+  test("test create table as select with select from same carbon table name with if not exists clause") {
     sql("drop table if exists ctas_same_table_name")
-    sql("CREATE TABLE ctas_same_table_name(key INT, value STRING) STORED by 'carbondata'")
-    intercept[Exception] {
-      sql("create table ctas_same_table_name stored by 'carbondata' as select * from ctas_same_table_name")
+    sql("CREATE TABLE ctas_same_table_name(key INT, value STRING) STORED BY 'carbondata'")
+    checkExistence(sql("SHOW TABLES"), true, "ctas_same_table_name")
+    sql(
+      """
+        | CREATE TABLE IF NOT EXISTS ctas_same_table_name
+        | STORED BY 'carbondata'
+        | AS SELECT * FROM ctas_same_table_name
+      """.stripMargin)
+    val e = intercept[TableAlreadyExistsException] {
+      sql(
+        """
+          | CREATE TABLE ctas_same_table_name
+          | STORED BY 'carbondata'
+          | AS SELECT * FROM ctas_same_table_name
+        """.stripMargin)
     }
+    assert(e.getMessage().contains("Table or view 'ctas_same_table_name' already exists"))
   }
 
   test("test create table as select with select from same table name when table does not exists") {
@@ -70,13 +89,6 @@ class TestCreateTableAsSelect extends QueryTest with BeforeAndAfterAll {
     }
   }
 
-  test("test create table as select with select from same table name with if not exists clause") {
-    sql("drop table if exists ctas_same_table_name")
-    sql("CREATE TABLE ctas_same_table_name(key INT, value STRING) STORED by 'carbondata'")
-    sql("create table if not exists ctas_same_table_name stored by 'carbondata' as select * from ctas_same_table_name")
-    assert(true)
-  }
-
   test("test create table as select with select from another carbon table") {
     sql("DROP TABLE IF EXISTS ctas_select_carbon")
     sql("create table ctas_select_carbon stored by 'carbondata' as select * from carbon_ctas_test")
@@ -130,14 +142,14 @@ class TestCreateTableAsSelect extends QueryTest with BeforeAndAfterAll {
   test("test create table as select with select directly having the data") {
     sql("DROP TABLE IF EXISTS ctas_select_direct_data")
     sql("create table ctas_select_direct_data stored by 'carbondata' as select 300,'carbondata'")
-    checkAnswer(sql("select * from ctas_select_direct_data"), Seq(Row(300,"carbondata")))
+    checkAnswer(sql("select * from ctas_select_direct_data"), Seq(Row(300, "carbondata")))
   }
 
   test("test create table as select with TBLPROPERTIES") {
     sql("DROP TABLE IF EXISTS ctas_tblproperties_test")
     sql(
       "create table ctas_tblproperties_test stored by 'carbondata' TBLPROPERTIES" +
-      "('DICTIONARY_INCLUDE'='key', 'sort_scope'='global_sort') as select * from carbon_ctas_test")
+        "('DICTIONARY_INCLUDE'='key', 'sort_scope'='global_sort') as select * from carbon_ctas_test")
     checkAnswer(sql("select * from ctas_tblproperties_test"), sql("select * from carbon_ctas_test"))
     val carbonTable = CarbonEnv.getInstance(Spark2TestQueryExecutor.spark).carbonMetastore
       .lookupRelation(Option("default"), "ctas_tblproperties_test")(Spark2TestQueryExecutor.spark)
@@ -170,10 +182,246 @@ class TestCreateTableAsSelect extends QueryTest with BeforeAndAfterAll {
     }
   }
 
+  test("test create table as select with where clause in select from parquet table that does not return data") {
+    sql("DROP TABLE IF EXISTS ctas_select_where_parquet")
+    sql(
+      """
+        | CREATE TABLE ctas_select_where_parquet
+        | STORED BY 'carbondata'
+        | as select * FROM parquet_ctas_test
+        | where key=300""".stripMargin)
+    checkAnswer(sql("SELECT * FROM ctas_select_where_parquet"),
+      sql("SELECT * FROM parquet_ctas_test where key=300"))
+  }
+
+  test("test create table as select with where clause in select from hive/orc table that does not return data") {
+    sql("DROP TABLE IF EXISTS ctas_select_where_orc")
+    sql(
+      """
+        | CREATE TABLE ctas_select_where_orc
+        | STORED BY 'carbondata'
+        | AS SELECT * FROM orc_ctas_test
+        | where key=300""".stripMargin)
+    checkAnswer(sql("SELECT * FROM ctas_select_where_orc"),
+      sql("SELECT * FROM orc_ctas_test where key=300"))
+  }
+
+  test("test create table as select with select from same carbon table name with if not exists clause and source table not exists") {
+    sql("DROP TABLE IF EXISTS ctas_same_table_name")
+    checkExistence(sql("SHOW TABLES"), false, "ctas_same_table_name")
+    //TODO: should throw NoSuchTableException
+    val e = intercept[AnalysisException] {
+      sql(
+        """
+          | CREATE TABLE IF NOT EXISTS ctas_same_table_name
+          | STORED BY 'carbondata'
+          | AS SELECT * FROM ctas_same_table_name
+        """.stripMargin)
+    }
+    assert(e.getMessage().contains("Table or view not found: ctas_same_table_name"))
+  }
+
+  test("add example for documentation") {
+    sql("DROP TABLE IF EXISTS target_table")
+    sql("DROP TABLE IF EXISTS source_table")
+    // create carbon table and insert data
+    sql(
+      """
+        | CREATE TABLE source_table(
+        |     id INT,
+        |     name STRING,
+        |     city STRING,
+        |     age INT)
+        |     STORED AS parquet
+        |     """.stripMargin)
+    sql("INSERT INTO source_table SELECT 1,'bob','shenzhen',27")
+    sql("INSERT INTO source_table SELECT 2,'david','shenzhen',31")
+    sql(
+      """
+        | CREATE TABLE target_table
+        | STORED BY 'carbondata'
+        | AS
+        |   SELECT city,avg(age) FROM source_table group by city
+      """.stripMargin)
+    // results:
+    //    sql("SELECT * FROM target_table").show
+    //    +--------+--------+
+    //    |    city|avg(age)|
+    //    +--------+--------+
+    //    |shenzhen|    29.0|
+    //    +--------+--------+
+    checkAnswer(sql("SELECT * FROM target_table"), Seq(Row("shenzhen", 29)))
+  }
+
+  test("test create table as select with sum,count,min,max") {
+    sql("DROP TABLE IF EXISTS target_table")
+    sql("DROP TABLE IF EXISTS source_table")
+    // create carbon table and insert data
+    sql(
+      """
+        | CREATE TABLE source_table(
+        |     id INT,
+        |     name STRING,
+        |     city STRING,
+        |     age INT)
+        | STORED BY 'carbondata'
+      """.stripMargin)
+    sql("INSERT INTO source_table SELECT 1,'bob','shenzhen',27")
+    sql("INSERT INTO source_table SELECT 2,'david','shenzhen',31")
+    sql(
+      """
+        | CREATE TABLE target_table
+        | STORED BY 'carbondata'
+        | AS
+        |   SELECT city,sum(age),count(age),min(age),max(age)
+        |   FROM source_table group by city
+      """.stripMargin)
+    checkAnswer(sql("SELECT * FROM target_table"), Seq(Row("shenzhen", 58, 2, 27, 31)))
+  }
+
+  test("test create table as select with insert data into source_table after CTAS") {
+    sql("DROP TABLE IF EXISTS target_table")
+    sql("DROP TABLE IF EXISTS source_table")
+    // create carbon table and insert data
+    sql(
+      """
+        | CREATE TABLE source_table(
+        |     id INT,
+        |     name STRING,
+        |     city STRING,
+        |     age INT)
+        |     STORED BY 'carbondata'
+        |     """.stripMargin)
+    sql("INSERT INTO source_table SELECT 1,'bob','shenzhen',27")
+    sql("INSERT INTO source_table SELECT 2,'david','shenzhen',31")
+    sql(
+      """
+        | CREATE TABLE target_table
+        | STORED BY 'carbondata'
+        | AS
+        |   SELECT city,sum(age),count(age),min(age),max(age)
+        |   FROM source_table group by city
+      """.stripMargin)
+    sql("INSERT INTO source_table SELECT 1,'bob','shenzhen',27")
+    sql("INSERT INTO source_table SELECT 2,'david','shenzhen',31")
+    checkAnswer(sql("SELECT * FROM target_table"), Seq(Row("shenzhen", 58, 2, 27, 31)))
+  }
+
+  test("test create table as select with auto merge") {
+    CarbonProperties.getInstance().
+      addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "true")
+    sql("DROP TABLE IF EXISTS target_table")
+    sql("DROP TABLE IF EXISTS source_table")
+    // create carbon table and insert data
+    sql(
+      """
+        | CREATE TABLE source_table(
+        |     id INT,
+        |     name STRING,
+        |     city STRING,
+        |     age INT)
+        |     STORED BY 'carbondata'
+        |     """.stripMargin)
+    sql("INSERT INTO source_table SELECT 1,'bob','shenzhen',27")
+    sql("INSERT INTO source_table SELECT 2,'david','shenzhen',31")
+    sql(
+      """
+        | CREATE TABLE target_table
+        | STORED BY 'carbondata'
+        | AS
+        |   SELECT city,avg(age)
+        |   FROM source_table group by city
+      """.stripMargin)
+    sql("INSERT INTO source_table SELECT 1,'bob','shenzhen',27")
+    sql("INSERT INTO source_table SELECT 2,'david','shenzhen',31")
+
+    checkExistence(sql("SHOW SEGMENTS FOR TABLE source_table"), true, "Compacted")
+    checkExistence(sql("SHOW SEGMENTS FOR TABLE target_table"), false, "Compacted")
+
+    sql("INSERT INTO target_table SELECT 'shenzhen',8")
+    sql("INSERT INTO target_table SELECT 'shenzhen',9")
+    sql("INSERT INTO target_table SELECT 'shenzhen',3")
+    checkExistence(sql("SHOW SEGMENTS FOR TABLE target_table"), true, "Compacted")
+    checkAnswer(sql("SELECT * FROM target_table"),
+      Seq(Row("shenzhen", 29), Row("shenzhen", 8), Row("shenzhen", 9), Row("shenzhen", 3)))
+    CarbonProperties.getInstance().
+      addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE,
+        CarbonCommonConstants.DEFAULT_ENABLE_AUTO_LOAD_MERGE)
+  }
+
+  test("test create table as select with filter, <, and, >=") {
+    sql("DROP TABLE IF EXISTS target_table")
+    sql("DROP TABLE IF EXISTS source_table")
+    // create carbon table and insert data
+    sql(
+      """
+        | CREATE TABLE source_table(
+        |     id INT,
+        |     name STRING,
+        |     city STRING,
+        |     age INT)
+        |     STORED BY 'carbondata'
+        |     """.stripMargin)
+    sql("INSERT INTO source_table SELECT 1,'bob','shenzhen',27")
+    sql("INSERT INTO source_table SELECT 2,'david','shenzhen',31")
+    sql("INSERT INTO source_table SELECT 3,'jack','shenzhen',5")
+    sql("INSERT INTO source_table SELECT 4,'alice','shenzhen',35")
+    sql(
+      """
+        | CREATE TABLE target_table
+        | STORED BY 'carbondata'
+        | AS
+        |   SELECT city,avg(age)
+        |   FROM source_table where age > 20 and age <= 31 GROUP BY city
+      """.stripMargin)
+
+    checkAnswer(sql("SELECT * FROM target_table"), Seq(Row("shenzhen", 29)))
+  }
+
+  test("test create table as select with filter, >=, or, =") {
+    sql("DROP TABLE IF EXISTS target_table")
+    sql("DROP TABLE IF EXISTS source_table")
+    // create carbon table and insert data
+    sql(
+      """
+        | CREATE TABLE source_table(
+        |     id INT,
+        |     name STRING,
+        |     city STRING,
+        |     age INT)
+        |     STORED BY 'carbondata'
+        |     """.stripMargin)
+    sql("INSERT INTO source_table SELECT 1,'bob','shenzhen',27")
+    sql("INSERT INTO source_table SELECT 2,'david','shenzhen',31")
+    sql("INSERT INTO source_table SELECT 3,'jack','shenzhen',5")
+    sql("INSERT INTO source_table SELECT 4,'alice','shenzhen',35")
+    sql(
+      """
+        | CREATE TABLE target_table
+        | STORED BY 'carbondata'
+        | AS
+        |   SELECT city,avg(age)
+        |   FROM source_table where age >= 20 or age = 5 group by city
+      """.stripMargin)
+
+    checkAnswer(sql("SELECT * FROM target_table"), Seq(Row("shenzhen", 24.5)))
+  }
+
   override def afterAll {
     sql("DROP TABLE IF EXISTS carbon_ctas_test")
     sql("DROP TABLE IF EXISTS parquet_ctas_test")
     sql("DROP TABLE IF EXISTS orc_ctas_test")
+    sql("DROP TABLE IF EXISTS ctas_same_table_name")
+    sql("DROP TABLE IF EXISTS ctas_select_carbon")
+    sql("DROP TABLE IF EXISTS ctas_select_direct_data")
+    sql("DROP TABLE IF EXISTS ctas_select_parquet")
+    sql("DROP TABLE IF EXISTS ctas_select_orc")
+    sql("DROP TABLE IF EXISTS ctas_select_where_carbon")
+    sql("DROP TABLE IF EXISTS ctas_select_where_parquet")
+    sql("DROP TABLE IF EXISTS ctas_select_where_orc")
+    sql("DROP TABLE IF EXISTS ctas_tblproperties_test")
+    sql("DROP TABLE IF EXISTS ctas_if_table_name")
+    sql("DROP TABLE IF EXISTS source_table")
+    sql("DROP TABLE IF EXISTS target_table")
   }
-
 }


[02/13] carbondata git commit: [CARBONDATA-2232][DataLoad] Fix incorrect logic in spilling unsafe pages to disk

Posted by ja...@apache.org.
[CARBONDATA-2232][DataLoad] Fix incorrect logic in spilling unsafe pages to disk

The unsafe row page will only be written to disk if the memory is
unavailable -- the previous logic just reversed it.

This closes #2037


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/a161841e
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/a161841e
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/a161841e

Branch: refs/heads/master
Commit: a161841e8808a3a477715346c8b28e683a5bc4d7
Parents: b509ad8
Author: xuchuanyin <xu...@hust.edu.cn>
Authored: Fri Mar 9 09:55:44 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Wed Mar 14 12:09:57 2018 +0800

----------------------------------------------------------------------
 .../loading/sort/unsafe/UnsafeSortDataRows.java | 37 ++++++++------------
 1 file changed, 14 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/a161841e/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
index eaa858e..7afda0e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
@@ -129,14 +129,7 @@ public class UnsafeSortDataRows {
    * This method will be used to initialize
    */
   public void initialize() throws MemoryException {
-    MemoryBlock baseBlock =
-        UnsafeMemoryManager.allocateMemoryWithRetry(this.taskId, inMemoryChunkSize);
-    boolean isMemoryAvailable =
-        UnsafeSortMemoryManager.INSTANCE.isMemoryAvailable(baseBlock.size());
-    if (isMemoryAvailable) {
-      UnsafeSortMemoryManager.INSTANCE.allocateDummyMemory(baseBlock.size());
-    }
-    this.rowPage = new UnsafeCarbonRowPage(tableFieldStat, baseBlock, !isMemoryAvailable, taskId);
+    this.rowPage = createUnsafeRowPage();
     // Delete if any older file exists in sort temp folder
     deleteSortLocationIfExists();
 
@@ -148,6 +141,17 @@ public class UnsafeSortDataRows {
     semaphore = new Semaphore(parameters.getNumberOfCores());
   }
 
+  private UnsafeCarbonRowPage createUnsafeRowPage() throws MemoryException {
+    MemoryBlock baseBlock =
+        UnsafeMemoryManager.allocateMemoryWithRetry(this.taskId, inMemoryChunkSize);
+    boolean isMemoryAvailable =
+        UnsafeSortMemoryManager.INSTANCE.isMemoryAvailable(baseBlock.size());
+    if (isMemoryAvailable) {
+      UnsafeSortMemoryManager.INSTANCE.allocateDummyMemory(baseBlock.size());
+    }
+    return new UnsafeCarbonRowPage(tableFieldStat, baseBlock, !isMemoryAvailable, taskId);
+  }
+
   public boolean canAdd() {
     return bytesAdded < maxSizeAllowed;
   }
@@ -192,14 +196,7 @@ public class UnsafeSortDataRows {
           unsafeInMemoryIntermediateFileMerger.startFileMergingIfPossible();
           semaphore.acquire();
           dataSorterAndWriterExecutorService.execute(new DataSorterAndWriter(rowPage));
-          MemoryBlock memoryBlock =
-              UnsafeMemoryManager.allocateMemoryWithRetry(this.taskId, inMemoryChunkSize);
-          boolean saveToDisk =
-              UnsafeSortMemoryManager.INSTANCE.isMemoryAvailable(memoryBlock.size());
-          if (!saveToDisk) {
-            UnsafeSortMemoryManager.INSTANCE.allocateDummyMemory(memoryBlock.size());
-          }
-          rowPage = new UnsafeCarbonRowPage(tableFieldStat, memoryBlock, saveToDisk, taskId);
+          rowPage = createUnsafeRowPage();
           bytesAdded += rowPage.addRow(rowBatch[i], rowBuffer.get());
         } catch (Exception e) {
           LOGGER.error(
@@ -227,13 +224,7 @@ public class UnsafeSortDataRows {
         unsafeInMemoryIntermediateFileMerger.startFileMergingIfPossible();
         semaphore.acquire();
         dataSorterAndWriterExecutorService.submit(new DataSorterAndWriter(rowPage));
-        MemoryBlock memoryBlock =
-            UnsafeMemoryManager.allocateMemoryWithRetry(this.taskId, inMemoryChunkSize);
-        boolean saveToDisk = UnsafeSortMemoryManager.INSTANCE.isMemoryAvailable(memoryBlock.size());
-        if (!saveToDisk) {
-          UnsafeSortMemoryManager.INSTANCE.allocateDummyMemory(memoryBlock.size());
-        }
-        rowPage = new UnsafeCarbonRowPage(tableFieldStat, memoryBlock, saveToDisk, taskId);
+        rowPage = createUnsafeRowPage();
         rowPage.addRow(row, rowBuffer.get());
       } catch (Exception e) {
         LOGGER.error(


[03/13] carbondata git commit: [HOTFIX] Fix fingbugs

Posted by ja...@apache.org.
[HOTFIX] Fix fingbugs


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/b509ad8d
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/b509ad8d
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/b509ad8d

Branch: refs/heads/master
Commit: b509ad8dac826c886c1e75b8c0c7b2c624785f18
Parents: be600bc
Author: Jacky Li <ja...@qq.com>
Authored: Fri Mar 9 16:43:31 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Wed Mar 14 12:09:57 2018 +0800

----------------------------------------------------------------------
 .../apache/carbondata/processing/merger/CarbonDataMergerUtil.java | 3 ---
 1 file changed, 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/b509ad8d/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
index 438e7db..ea5eb42 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
@@ -32,7 +32,6 @@ import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.locks.ICarbonLock;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
 import org.apache.carbondata.core.mutate.DeleteDeltaBlockDetails;
@@ -593,8 +592,6 @@ public final class CarbonDataMergerUtil {
         new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
 
     CarbonTable carbonTable = carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable();
-    CarbonTableIdentifier tableIdentifier = carbonTable.getCarbonTableIdentifier();
-
 
     // total length
     long totalLength = 0;


[08/13] carbondata git commit: [CARBONDATA-2231] Removed redundant code from streaming test cases to improve CI time

Posted by ja...@apache.org.
[CARBONDATA-2231] Removed redundant code from streaming test cases to improve CI time

This closes #2036


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/07d4da7a
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/07d4da7a
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/07d4da7a

Branch: refs/heads/master
Commit: 07d4da7a24175d2504560ece4200abc539aa309f
Parents: 18380a6
Author: Geetika Gupta <ge...@knoldus.in>
Authored: Tue Mar 6 12:02:59 2018 +0530
Committer: Jacky Li <ja...@qq.com>
Committed: Wed Mar 14 12:11:11 2018 +0800

----------------------------------------------------------------------
 .../TestStreamingTableOperation.scala           | 35 +++++++++-----------
 1 file changed, 15 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/07d4da7a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
index 3b599fc..aa00d07 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
@@ -91,8 +91,6 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
     createTableWithComplexType(
       tableName = "stream_table_filter_complex", streaming = true, withBatchLoad = true)
 
-    // 10. fault tolerant
-    createTable(tableName = "stream_table_tolerant", streaming = true, withBatchLoad = true)
 
     // 11. table for delete segment test
     createTable(tableName = "stream_table_delete_id", streaming = true, withBatchLoad = false)
@@ -132,7 +130,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
         | TBLPROPERTIES ('streaming' = 'false')
       """.stripMargin)
     sql("DROP TABLE correct")
-    intercept[MalformedCarbonCommandException] {
+    val exceptionMsg = intercept[MalformedCarbonCommandException] {
       sql(
         """
           | create table wrong(
@@ -141,30 +139,37 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
           | TBLPROPERTIES ('streaming' = 'invalid')
         """.stripMargin)
     }
+    assert(exceptionMsg.getMessage.equals("Table property \'streaming\' should be either \'true\' or \'false\'"))
   }
 
   test("test blocking update and delete operation on streaming table") {
-    intercept[MalformedCarbonCommandException] {
+    val exceptionMsgUpdate = intercept[MalformedCarbonCommandException] {
       sql("""UPDATE source d SET (d.c2) = (d.c2 + 1) WHERE d.c1 = 'a'""").show()
     }
-    intercept[MalformedCarbonCommandException] {
+    val exceptionMsgDelete = intercept[MalformedCarbonCommandException] {
       sql("""DELETE FROM source WHERE d.c1 = 'a'""").show()
     }
+    assert(exceptionMsgUpdate.getMessage.equals("Data update is not allowed for streaming table"))
+    assert(exceptionMsgDelete.getMessage.equals("Date delete is not allowed for streaming table"))
   }
 
   test("test blocking alter table operation on streaming table") {
-    intercept[MalformedCarbonCommandException] {
+    val addColException = intercept[MalformedCarbonCommandException] {
       sql("""ALTER TABLE source ADD COLUMNS (c6 string)""").show()
     }
-    intercept[MalformedCarbonCommandException] {
+    val dropColException = intercept[MalformedCarbonCommandException] {
       sql("""ALTER TABLE source DROP COLUMNS (c1)""").show()
     }
-    intercept[MalformedCarbonCommandException] {
+    val renameException = intercept[MalformedCarbonCommandException] {
       sql("""ALTER TABLE source RENAME to t""").show()
     }
-    intercept[MalformedCarbonCommandException] {
-      sql("""ALTER TABLE source CHANGE c1 c1 int""").show()
+    val changeDataTypeException = intercept[MalformedCarbonCommandException] {
+      sql("""ALTER TABLE source CHANGE c2 c2 bigint""").show()
     }
+    assertResult("Alter table add column is not allowed for streaming table")(addColException.getMessage)
+    assertResult("Alter table drop column is not allowed for streaming table")(dropColException.getMessage)
+    assertResult("Alter rename table is not allowed for streaming table")(renameException.getMessage)
+    assertResult("Alter table change datatype is not allowed for streaming table")(changeDataTypeException.getMessage)
   }
 
   override def afterAll {
@@ -180,14 +185,12 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
     sql("drop table if exists streaming.stream_table_1s")
     sql("drop table if exists streaming.stream_table_filter ")
     sql("drop table if exists streaming.stream_table_filter_complex")
-    sql("drop table if exists streaming.stream_table_tolerant")
     sql("drop table if exists streaming.stream_table_delete_id")
     sql("drop table if exists streaming.stream_table_delete_date")
     sql("drop table if exists streaming.stream_table_handoff")
     sql("drop table if exists streaming.stream_table_reopen")
     sql("drop table if exists streaming.stream_table_drop")
     sql("drop table if exists streaming.agg_table_block")
-    sql("drop table if exists streaming.agg_table_block_agg0")
   }
 
   // normal table not support streaming ingest
@@ -994,14 +997,6 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
       sql("select count(*) from streaming.stream_table_handoff"),
       Seq(Row(2 * 100))
     )
-
-    try {
-      sql("ALTER TABLE stream_table_handoff SET TBLPROPERTIES('streaming'='false')")
-      assert(false, "unsupport disable streaming properties")
-    } catch {
-      case _ =>
-        assert(true)
-    }
   }
 
   test("auto hand off, close and reopen streaming table") {