You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2016/08/06 10:00:18 UTC

[01/20] incubator-carbondata git commit: [Bug]Fix the exception log about the result of checking csv header (#881)

Repository: incubator-carbondata
Updated Branches:
  refs/heads/master 8552a8222 -> 65c48abef


[Bug]Fix the exception log about the result of checking csv header (#881)

[Bug]Fix the exception log about the result of checking csv header (#881)

Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/f2e60332
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/f2e60332
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/f2e60332

Branch: refs/heads/master
Commit: f2e60332fe0cb31fa61c48749d620372269c1aae
Parents: b82a960
Author: Zhangshunyu <zh...@huawei.com>
Authored: Mon Aug 1 16:55:48 2016 +0800
Committer: david <qi...@qq.com>
Committed: Mon Aug 1 16:55:48 2016 +0800

----------------------------------------------------------------------
 .../org/carbondata/processing/csvload/DataGraphExecuter.java  | 7 ++++---
 .../org/carbondata/processing/csvload/GraphExecutionUtil.java | 5 ++++-
 2 files changed, 8 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f2e60332/processing/src/main/java/org/carbondata/processing/csvload/DataGraphExecuter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/processing/csvload/DataGraphExecuter.java b/processing/src/main/java/org/carbondata/processing/csvload/DataGraphExecuter.java
index 91f5aee..882b1a7 100644
--- a/processing/src/main/java/org/carbondata/processing/csvload/DataGraphExecuter.java
+++ b/processing/src/main/java/org/carbondata/processing/csvload/DataGraphExecuter.java
@@ -460,10 +460,11 @@ public class DataGraphExecuter {
     }
 
     if (count != columnNames.length) {
-      LOGGER.error(
-          "CSV File provided is not proper. Column names in schema and CSV header are not same.");
+      LOGGER.error("CSV header provided in DDL is not proper." +
+          " Column names in schema and CSV header are not the same.");
       throw new DataLoadingException(DataProcessorConstants.CSV_VALIDATION_ERRROR_CODE,
-          "CSV File provided is not proper. Column names in schema and csv header are not same.");
+          "CSV header provided in DDL is not proper. Column names in schema and CSV header are " +
+              "not the same.");
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f2e60332/processing/src/main/java/org/carbondata/processing/csvload/GraphExecutionUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/processing/csvload/GraphExecutionUtil.java b/processing/src/main/java/org/carbondata/processing/csvload/GraphExecutionUtil.java
index 27a1f5c..c5fd299 100644
--- a/processing/src/main/java/org/carbondata/processing/csvload/GraphExecutionUtil.java
+++ b/processing/src/main/java/org/carbondata/processing/csvload/GraphExecutionUtil.java
@@ -306,7 +306,10 @@ public final class GraphExecutionUtil {
           count++;
         }
       }
-
+      if (0 == count) {
+        LOGGER.error("There is No proper CSV file header found." +
+            " Either the ddl or the CSV file should provide CSV file header. ");
+      }
       return (count == columnNames.length);
     }
 


[15/20] incubator-carbondata git commit: [CARBONDATA-128] Modification done to read thrift files using Tcompact protocol (#907)

Posted by ra...@apache.org.
[CARBONDATA-128] Modification done to read thrift files using Tcompact protocol (#907)

replace Tbinary protocol with TCompact for performance 

Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/89847ea2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/89847ea2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/89847ea2

Branch: refs/heads/master
Commit: 89847ea20308d6685c0d28dbbadd620bffb971b5
Parents: 1721d40
Author: manishgupta88 <to...@gmail.com>
Authored: Thu Aug 4 17:17:16 2016 +0530
Committer: Venkata Ramana G <g....@gmail.com>
Committed: Thu Aug 4 17:17:16 2016 +0530

----------------------------------------------------------------------
 .../main/java/org/carbondata/core/reader/ThriftReader.java    | 7 ++++---
 .../main/java/org/carbondata/core/writer/ThriftWriter.java    | 7 ++++---
 2 files changed, 8 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/89847ea2/core/src/main/java/org/carbondata/core/reader/ThriftReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/reader/ThriftReader.java b/core/src/main/java/org/carbondata/core/reader/ThriftReader.java
index e659919..92a6be1 100644
--- a/core/src/main/java/org/carbondata/core/reader/ThriftReader.java
+++ b/core/src/main/java/org/carbondata/core/reader/ThriftReader.java
@@ -27,7 +27,8 @@ import org.carbondata.core.util.CarbonUtil;
 
 import org.apache.thrift.TBase;
 import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocol;
 import org.apache.thrift.transport.TIOStreamTransport;
 
 /**
@@ -53,7 +54,7 @@ public class ThriftReader {
   /**
    * For reading the binary thrift objects.
    */
-  private TBinaryProtocol binaryIn;
+  private TProtocol binaryIn;
 
   /**
    * Constructor.
@@ -76,7 +77,7 @@ public class ThriftReader {
   public void open() throws IOException {
     FileFactory.FileType fileType = FileFactory.getFileType(fileName);
     dataInputStream = FileFactory.getDataInputStream(fileName, fileType, bufferSize);
-    binaryIn = new TBinaryProtocol(new TIOStreamTransport(dataInputStream));
+    binaryIn = new TCompactProtocol(new TIOStreamTransport(dataInputStream));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/89847ea2/core/src/main/java/org/carbondata/core/writer/ThriftWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/writer/ThriftWriter.java b/core/src/main/java/org/carbondata/core/writer/ThriftWriter.java
index c2822f4..2c5ee1d 100644
--- a/core/src/main/java/org/carbondata/core/writer/ThriftWriter.java
+++ b/core/src/main/java/org/carbondata/core/writer/ThriftWriter.java
@@ -28,7 +28,8 @@ import org.carbondata.core.util.CarbonUtil;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.thrift.TBase;
 import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocol;
 import org.apache.thrift.transport.TIOStreamTransport;
 
 /**
@@ -54,7 +55,7 @@ public class ThriftWriter {
   /**
    * For binary serialization of objects.
    */
-  private TBinaryProtocol binaryOut;
+  private TProtocol binaryOut;
 
   /**
    * flag to append to existing file
@@ -75,7 +76,7 @@ public class ThriftWriter {
   public void open() throws IOException {
     FileFactory.FileType fileType = FileFactory.getFileType(fileName);
     dataOutputStream = FileFactory.getDataOutputStream(fileName, fileType, bufferSize, append);
-    binaryOut = new TBinaryProtocol(new TIOStreamTransport(dataOutputStream));
+    binaryOut = new TCompactProtocol(new TIOStreamTransport(dataOutputStream));
   }
 
   /**


[20/20] incubator-carbondata git commit: Merge from old repo This closes #60

Posted by ra...@apache.org.
Merge from old repo This closes #60


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/65c48abe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/65c48abe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/65c48abe

Branch: refs/heads/master
Commit: 65c48abef7e3126b19e22bd991a06af69c437664
Parents: 8552a82 1698b1a
Author: ravipesala <ra...@gmail.com>
Authored: Sat Aug 6 15:29:20 2016 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Sat Aug 6 15:29:20 2016 +0530

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |   4 +
 .../store/impl/DFSFileHolderImpl.java           |   4 +-
 .../datastorage/store/impl/FileFactory.java     |   6 +-
 .../carbondata/core/reader/ThriftReader.java    |   7 +-
 .../CarbonDictionarySortIndexReaderImpl.java    |  43 +-
 .../carbondata/core/writer/ThriftWriter.java    |   7 +-
 .../carbon/datastore/BlockIndexStoreTest.java   | 408 +++++++++----------
 ...CarbonDictionarySortIndexReaderImplTest.java |  15 +-
 ...CarbonDictionarySortIndexWriterImplTest.java |  15 +
 .../allqueries/AllDataTypesTestCase4.scala      |   4 +-
 .../allqueries/AllDataTypesTestCase6.scala      |  16 +-
 .../carbondata/spark/load/CarbonLoaderUtil.java |  28 +-
 .../org/apache/spark/sql/CarbonSqlParser.scala  |   7 +-
 .../execution/command/carbonTableSchema.scala   |  10 +-
 .../spark/rdd/CarbonDataLoadRDD.scala           |  21 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        |   2 +-
 .../spark/rdd/CarbonGlobalDictionaryRDD.scala   |  18 +-
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  |  21 +-
 .../carbondata/spark/rdd/CarbonScanRDD.scala    | 254 ++++++------
 .../org/carbondata/spark/rdd/Compactor.scala    |   1 +
 .../carbondata/spark/util/CarbonScalaUtil.scala |  71 +++-
 .../testsuite/bigdecimal/TestBigDecimal.scala   |  59 ++-
 .../MajorCompactionStopsAfterCompaction.scala   |   2 +-
 .../dataretention/DataRetentionTestCase.scala   |  33 +-
 .../processing/csvload/DataGraphExecuter.java   |   7 +-
 .../processing/csvload/GraphExecutionUtil.java  |   5 +-
 .../processing/csvreaderstep/CsvInput.java      |  24 +-
 .../csvreaderstep/UnivocityCsvParser.java       |   1 +
 .../sortdata/IntermediateFileMerger.java        |   5 +-
 .../store/CarbonFactDataHandlerColumnar.java    |  28 +-
 .../csvbased/CarbonCSVBasedSeqGenStep.java      |  23 +-
 31 files changed, 660 insertions(+), 489 deletions(-)
----------------------------------------------------------------------



[02/20] incubator-carbondata git commit: [CARBONDATA-127] Issue while type casting data read from sort temp file to big decimal type (#893)

Posted by ra...@apache.org.
[CARBONDATA-127] Issue while type casting data read from sort temp file to big decimal type (#893)

Analysis: Whenever we perform data load operation involving huge data with decimal datatypes, then during intermediate merging pf sort temp file we are trying to typecast byte array to big decimal value after reading from object array. During this operation typecast exception is thrown.

Impact area: data load flow with huge data and measures with big decimal datatype

Fix: Typescast the object array value to byte array instaed of bigdecimal

Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/1695d606
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/1695d606
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/1695d606

Branch: refs/heads/master
Commit: 1695d606eb3f770a7fd6003280b15ed0e520d6f0
Parents: f2e6033
Author: manishgupta88 <to...@gmail.com>
Authored: Mon Aug 1 16:15:37 2016 +0530
Committer: Kumar Vishal <ku...@gmail.com>
Committed: Mon Aug 1 16:15:37 2016 +0530

----------------------------------------------------------------------
 .../carbondata/spark/testsuite/bigdecimal/TestBigDecimal.scala | 6 ++++++
 .../sortandgroupby/sortdata/IntermediateFileMerger.java        | 5 +----
 2 files changed, 7 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1695d606/integration/spark/src/test/scala/org/carbondata/spark/testsuite/bigdecimal/TestBigDecimal.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/bigdecimal/TestBigDecimal.scala b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/bigdecimal/TestBigDecimal.scala
index fb28568..2f6b9f8 100644
--- a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/bigdecimal/TestBigDecimal.scala
+++ b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/bigdecimal/TestBigDecimal.scala
@@ -37,6 +37,8 @@ class TestBigDecimal extends QueryTest with BeforeAndAfterAll {
     sql("drop table if exists hiveBigDecimal")
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.SORT_SIZE, "1")
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.SORT_INTERMEDIATE_FILES_LIMIT, "2")
     sql("CREATE TABLE IF NOT EXISTS carbonTable (ID Int, date Timestamp, country String, name String, phonetype String, serialname String, salary Decimal(17,2))STORED BY 'org.apache.carbondata.format'")
     sql("create table if not exists hiveTable(ID Int, date Timestamp, country String, name String, phonetype String, serialname String, salary Decimal(17,2))row format delimited fields terminated by ','")
     sql("LOAD DATA LOCAL INPATH './src/test/resources/decimalDataWithHeader.csv' into table carbonTable")
@@ -165,6 +167,10 @@ class TestBigDecimal extends QueryTest with BeforeAndAfterAll {
     sql("drop table if exists hiveBigDecimal")
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.SORT_SIZE,
+      CarbonCommonConstants.SORT_SIZE_DEFAULT_VAL)
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.SORT_INTERMEDIATE_FILES_LIMIT,
+      CarbonCommonConstants.SORT_INTERMEDIATE_FILES_LIMIT_DEFAULT_VALUE)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1695d606/processing/src/main/java/org/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java b/processing/src/main/java/org/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java
index 779b4e8..56f6414 100644
--- a/processing/src/main/java/org/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java
+++ b/processing/src/main/java/org/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java
@@ -25,7 +25,6 @@ import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
-import java.math.BigDecimal;
 import java.util.AbstractQueue;
 import java.util.PriorityQueue;
 import java.util.concurrent.Callable;
@@ -35,7 +34,6 @@ import org.carbondata.common.logging.LogServiceFactory;
 import org.carbondata.core.constants.CarbonCommonConstants;
 import org.carbondata.core.util.CarbonUtil;
 import org.carbondata.core.util.CarbonUtilException;
-import org.carbondata.core.util.DataTypeUtil;
 import org.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
 import org.carbondata.processing.util.RemoveDictionaryUtil;
 
@@ -341,8 +339,7 @@ public class IntermediateFileMerger implements Callable<Void> {
             Long val = (Long) RemoveDictionaryUtil.getMeasure(fieldIndex, row);
             stream.writeLong(val);
           } else if (aggType[counter] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
-            BigDecimal val = (BigDecimal) RemoveDictionaryUtil.getMeasure(fieldIndex, row);
-            byte[] bigDecimalInBytes = DataTypeUtil.bigDecimalToByte(val);
+            byte[] bigDecimalInBytes = (byte[]) RemoveDictionaryUtil.getMeasure(fieldIndex, row);
             stream.writeInt(bigDecimalInBytes.length);
             stream.write(bigDecimalInBytes);
           }


[13/20] incubator-carbondata git commit: [CARBONDATA-138] Avg aggregation for decimal type keeping sync with hive (#900)

Posted by ra...@apache.org.
[CARBONDATA-138] Avg aggregation for decimal type keeping sync with hive (#900)

scale up the decimal value during average aggregation

Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/07fe4d21
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/07fe4d21
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/07fe4d21

Branch: refs/heads/master
Commit: 07fe4d2132f62ffaa3eac5a67315ffb6183ee45a
Parents: a724831
Author: Gin-zhj <zh...@huawei.com>
Authored: Thu Aug 4 18:40:43 2016 +0800
Committer: Venkata Ramana G <g....@gmail.com>
Committed: Thu Aug 4 16:10:43 2016 +0530

----------------------------------------------------------------------
 .../impl/AvgBigDecimalAggregator.java           | 12 ++++-
 .../carbondata/spark/agg/CarbonAggregates.scala | 12 +++--
 .../testsuite/bigdecimal/TestBigDecimal.scala   | 53 +++++++++++++++-----
 3 files changed, 59 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/07fe4d21/core/src/main/java/org/carbondata/query/aggregator/impl/AvgBigDecimalAggregator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/AvgBigDecimalAggregator.java b/core/src/main/java/org/carbondata/query/aggregator/impl/AvgBigDecimalAggregator.java
index 8c67cfc..2c5e59b 100644
--- a/core/src/main/java/org/carbondata/query/aggregator/impl/AvgBigDecimalAggregator.java
+++ b/core/src/main/java/org/carbondata/query/aggregator/impl/AvgBigDecimalAggregator.java
@@ -25,11 +25,15 @@ import java.io.IOException;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 
+import static java.lang.Math.min;
+
 import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk;
 import org.carbondata.core.constants.CarbonCommonConstants;
 import org.carbondata.core.util.DataTypeUtil;
 import org.carbondata.query.aggregator.MeasureAggregator;
 
+import org.apache.spark.sql.types.DecimalType;
+
 public class AvgBigDecimalAggregator extends AbstractMeasureAggregatorBasic {
 
   /**
@@ -115,7 +119,9 @@ public class AvgBigDecimalAggregator extends AbstractMeasureAggregatorBasic {
    * @return average aggregate value
    */
   @Override public BigDecimal getBigDecimalValue() {
-    return aggVal.divide(new BigDecimal(count), 6);
+    // increase scale to avoid any precision lost in the data
+    int updatedScale = min(aggVal.scale() + 4, DecimalType.MAX_SCALE());
+    return aggVal.divide(new BigDecimal(count), updatedScale, BigDecimal.ROUND_HALF_EVEN);
   }
 
   /**
@@ -139,7 +145,9 @@ public class AvgBigDecimalAggregator extends AbstractMeasureAggregatorBasic {
    * @return average value as an object
    */
   @Override public Object getValueObject() {
-    return aggVal.divide(new BigDecimal(count));
+    // increase scale to avoid any precision lost in the data
+    int updatedScale = min(aggVal.scale() + 4, DecimalType.MAX_SCALE());
+    return aggVal.divide(new BigDecimal(count), updatedScale, BigDecimal.ROUND_HALF_EVEN);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/07fe4d21/integration/spark/src/main/scala/org/carbondata/spark/agg/CarbonAggregates.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/agg/CarbonAggregates.scala b/integration/spark/src/main/scala/org/carbondata/spark/agg/CarbonAggregates.scala
index 50872ff..c3336b2 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/agg/CarbonAggregates.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/agg/CarbonAggregates.scala
@@ -17,9 +17,8 @@
 
 package org.carbondata.spark.agg
 
-import java.math.BigDecimal
-
 import scala.language.implicitConversions
+import scala.math.min
 
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
@@ -116,7 +115,14 @@ case class AverageCarbon(child: Expression, castedDataType: DataType = null)
       AverageCarbonFinal(partialSum.toAttribute,
         child.dataType match {
           case IntegerType | StringType | LongType | TimestampType => DoubleType
-          case _ => CarbonScalaUtil.updateDataType(child.dataType)
+          case decimal: DecimalType =>
+            val precision = decimal.asInstanceOf[DecimalType].precision
+            val scale = decimal.asInstanceOf[DecimalType].scale
+            // increase precision and scale to avoid any precision lost in the data
+            val updatedPrecision = min(precision + 4, DecimalType.MAX_PRECISION)
+            val updatedScale = min(scale + 4, DecimalType.MAX_SCALE)
+            DecimalType(updatedPrecision, updatedScale)
+          case _ => child.dataType
         }),
       partialSum :: Nil)
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/07fe4d21/integration/spark/src/test/scala/org/carbondata/spark/testsuite/bigdecimal/TestBigDecimal.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/bigdecimal/TestBigDecimal.scala b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/bigdecimal/TestBigDecimal.scala
index 2f6b9f8..ab62277 100644
--- a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/bigdecimal/TestBigDecimal.scala
+++ b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/bigdecimal/TestBigDecimal.scala
@@ -35,6 +35,7 @@ class TestBigDecimal extends QueryTest with BeforeAndAfterAll {
     sql("drop table if exists carbonTable")
     sql("drop table if exists hiveTable")
     sql("drop table if exists hiveBigDecimal")
+    sql("drop table if exists carbonBigDecimal_2")
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
     CarbonProperties.getInstance().addProperty(CarbonCommonConstants.SORT_SIZE, "1")
@@ -45,6 +46,8 @@ class TestBigDecimal extends QueryTest with BeforeAndAfterAll {
     sql("LOAD DATA local inpath './src/test/resources/decimalDataWithoutHeader.csv' INTO table hiveTable")
     sql("create table if not exists hiveBigDecimal(ID Int, date Timestamp, country String, name String, phonetype String, serialname String, salary decimal(27, 10))row format delimited fields terminated by ','")
     sql("LOAD DATA local inpath './src/test/resources/decimalBoundaryDataHive.csv' INTO table hiveBigDecimal")
+    sql("create table if not exists carbonBigDecimal_2 (ID Int, date Timestamp, country String, name String, phonetype String, serialname String, salary decimal(30, 10)) STORED BY 'org.apache.carbondata.format'")
+    sql("LOAD DATA LOCAL INPATH './src/test/resources/decimalBoundaryDataCarbon.csv' into table carbonBigDecimal_2")
   }
 
   test("test detail query on big decimal column") {
@@ -139,32 +142,56 @@ class TestBigDecimal extends QueryTest with BeforeAndAfterAll {
     sql("drop table if exists decimalDictLookUp")
   }
 
-  test("test sum aggregation on big decimal column with high precision") {
-    sql("drop table if exists carbonBigDecimal")
-    sql("create table if not exists carbonBigDecimal (ID Int, date Timestamp, country String, name String, phonetype String, serialname String, salary decimal(30, 10)) STORED BY 'org.apache.carbondata.format'")
-    sql("LOAD DATA LOCAL INPATH './src/test/resources/decimalBoundaryDataCarbon.csv' into table carbonBigDecimal")
-
-    checkAnswer(sql("select sum(salary)+10 from carbonBigDecimal"),
+  test("test sum+10 aggregation on big decimal column with high precision") {
+    checkAnswer(sql("select sum(salary)+10 from carbonBigDecimal_2"),
       sql("select sum(salary)+10 from hiveBigDecimal"))
+  }
 
-    sql("drop table if exists carbonBigDecimal")
+  test("test sum*10 aggregation on big decimal column with high precision") {
+    checkAnswer(sql("select sum(salary)*10 from carbonBigDecimal_2"),
+      sql("select sum(salary)*10 from hiveBigDecimal"))
   }
 
-  test("test sum-distinct aggregation on big decimal column with high precision") {
-    sql("drop table if exists carbonBigDecimal")
-    sql("create table if not exists carbonBigDecimal (ID Int, date Timestamp, country String, name String, phonetype String, serialname String, salary decimal(30, 10)) STORED BY 'org.apache.carbondata.format'")
-    sql("LOAD DATA LOCAL INPATH './src/test/resources/decimalBoundaryDataCarbon.csv' into table carbonBigDecimal")
+  test("test sum/10 aggregation on big decimal column with high precision") {
+    checkAnswer(sql("select sum(salary)/10 from carbonBigDecimal_2"),
+      sql("select sum(salary)/10 from hiveBigDecimal"))
+  }
 
-    checkAnswer(sql("select sum(distinct(salary))+10 from carbonBigDecimal"),
+  test("test sum-distinct+10 aggregation on big decimal column with high precision") {
+    checkAnswer(sql("select sum(distinct(salary))+10 from carbonBigDecimal_2"),
       sql("select sum(distinct(salary))+10 from hiveBigDecimal"))
+  }
 
-    sql("drop table if exists carbonBigDecimal")
+  test("test sum-distinct*10 aggregation on big decimal column with high precision") {
+    checkAnswer(sql("select sum(distinct(salary))*10 from carbonBigDecimal_2"),
+      sql("select sum(distinct(salary))*10 from hiveBigDecimal"))
+  }
+
+  test("test sum-distinct/10 aggregation on big decimal column with high precision") {
+    checkAnswer(sql("select sum(distinct(salary))/10 from carbonBigDecimal_2"),
+      sql("select sum(distinct(salary))/10 from hiveBigDecimal"))
+  }
+
+  test("test avg+10 aggregation on big decimal column with high precision") {
+    checkAnswer(sql("select avg(salary)+10 from carbonBigDecimal_2"),
+      sql("select avg(salary)+10 from hiveBigDecimal"))
+  }
+
+  test("test avg*10 aggregation on big decimal column with high precision") {
+    checkAnswer(sql("select avg(salary)*10 from carbonBigDecimal_2"),
+      sql("select avg(salary)*10 from hiveBigDecimal"))
+  }
+
+  test("test avg/10 aggregation on big decimal column with high precision") {
+    checkAnswer(sql("select avg(salary)/10 from carbonBigDecimal_2"),
+      sql("select avg(salary)/10 from hiveBigDecimal"))
   }
 
   override def afterAll {
     sql("drop table if exists carbonTable")
     sql("drop table if exists hiveTable")
     sql("drop table if exists hiveBigDecimal")
+    sql("drop table if exists carbonBigDecimal_2")
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
     CarbonProperties.getInstance().addProperty(CarbonCommonConstants.SORT_SIZE,


[16/20] incubator-carbondata git commit: Merge remote-tracking branch 'HuaweiBigData/master' into apache/master

Posted by ra...@apache.org.
Merge remote-tracking branch 'HuaweiBigData/master' into apache/master

Conflicts:
	core/src/main/java/org/carbondata/query/aggregator/impl/AvgBigDecimalAggregator.java
	core/src/main/java/org/carbondata/query/carbon/executor/internal/impl/InternalDetailQueryExecutor.java
	core/src/main/java/org/carbondata/query/carbon/result/iterator/AbstractDetailQueryResultIterator.java
	integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
	integration/spark/src/main/scala/org/carbondata/spark/agg/CarbonAggregates.scala
	integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
	integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonScanRDD.scala
	integration/spark/src/main/scala/org/carbondata/spark/util/CarbonScalaUtil.scala
	integration/spark/src/test/scala/org/carbondata/spark/testsuite/joinquery/EquiJoinTestCase.scala


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/97598381
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/97598381
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/97598381

Branch: refs/heads/master
Commit: 975983816666c07b401fdc75b9729c3fabe61e88
Parents: 29f9cf2 89847ea
Author: ravipesala <ra...@gmail.com>
Authored: Thu Aug 4 19:28:37 2016 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Thu Aug 4 19:28:37 2016 +0530

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |   4 +
 .../store/impl/DFSFileHolderImpl.java           |   4 +-
 .../datastorage/store/impl/FileFactory.java     |   6 +-
 .../carbondata/core/reader/ThriftReader.java    |   7 +-
 .../CarbonDictionarySortIndexReaderImpl.java    |  43 +++-
 .../carbondata/core/writer/ThriftWriter.java    |   7 +-
 ...CarbonDictionarySortIndexReaderImplTest.java |  15 +-
 ...CarbonDictionarySortIndexWriterImplTest.java |  15 ++
 .../allqueries/AllDataTypesTestCase4.scala      |   4 +-
 .../allqueries/AllDataTypesTestCase6.scala      |  16 +-
 .../carbondata/spark/load/CarbonLoaderUtil.java |  28 +-
 .../org/apache/spark/sql/CarbonSqlParser.scala  |   7 +-
 .../execution/command/carbonTableSchema.scala   |  10 +-
 .../spark/rdd/CarbonDataLoadRDD.scala           |  21 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        |   2 +-
 .../spark/rdd/CarbonGlobalDictionaryRDD.scala   |  18 +-
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  |  21 +-
 .../carbondata/spark/rdd/CarbonScanRDD.scala    | 254 +++++++++----------
 .../org/carbondata/spark/rdd/Compactor.scala    |   1 +
 .../carbondata/spark/util/CarbonScalaUtil.scala |  50 +++-
 .../testsuite/bigdecimal/TestBigDecimal.scala   |  59 ++++-
 .../MajorCompactionStopsAfterCompaction.scala   |   2 +-
 .../dataretention/DataRetentionTestCase.scala   |  33 ++-
 .../processing/csvload/DataGraphExecuter.java   |   7 +-
 .../processing/csvload/GraphExecutionUtil.java  |   5 +-
 .../processing/csvreaderstep/CsvInput.java      |  24 +-
 .../csvreaderstep/UnivocityCsvParser.java       |   1 +
 .../sortdata/IntermediateFileMerger.java        |   5 +-
 .../store/CarbonFactDataHandlerColumnar.java    |  28 +-
 .../csvbased/CarbonCSVBasedSeqGenStep.java      |  23 +-
 30 files changed, 450 insertions(+), 270 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/97598381/core/src/main/java/org/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/97598381/integration-testcases/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCase6.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/97598381/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoaderUtil.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/97598381/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
----------------------------------------------------------------------
diff --cc integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
index 22b6021,e0ccd2b..6a460e8
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
@@@ -1288,37 -1308,35 +1287,37 @@@ class CarbonSqlParser(
    }
  
    protected lazy val showLoads: Parser[LogicalPlan] =
-     SHOW ~> (LOADS|SEGMENTS) ~> FOR ~> TABLE ~> (ident <~ ".").? ~ ident ~
+     SHOW ~> SEGMENTS ~> FOR ~> TABLE ~> (ident <~ ".").? ~ ident ~
        (LIMIT ~> numericLit).? <~
        opt(";") ^^ {
 -      case schemaName ~ cubeName ~ limit =>
 -        ShowLoadsCommand(schemaName, cubeName.toLowerCase(), limit)
 +      case databaseName ~ tableName ~ limit =>
 +        ShowLoadsCommand(databaseName, tableName.toLowerCase(), limit)
      }
  
    protected lazy val segmentId: Parser[String] =
 -    ( numericLit ^^ { u => u } |
 -      elem("decimal", _.isInstanceOf[lexical.FloatLit]) ^^ (_.chars)
 -      )
 +    numericLit ^^ { u => u } |
 +      elem("decimal", p => {
 +        p.getClass.getSimpleName.equals("FloatLit") ||
 +        p.getClass.getSimpleName.equals("DecimalLit") } ) ^^ (_.chars)
  
    protected lazy val deleteLoadsByID: Parser[LogicalPlan] =
-     DELETE ~> (LOAD|SEGMENT) ~> repsep(segmentId, ",") ~ (FROM ~> TABLE ~>
+     DELETE ~> SEGMENT ~> repsep(segmentId, ",") ~ (FROM ~> TABLE ~>
        (ident <~ ".").? ~ ident) <~
        opt(";") ^^ {
 -      case loadids ~ cube => cube match {
 -        case schemaName ~ cubeName => DeleteLoadsById(loadids, schemaName, cubeName.toLowerCase())
 +      case loadids ~ table => table match {
 +        case databaseName ~ tableName =>
 +          DeleteLoadsById(loadids, databaseName, tableName.toLowerCase())
        }
      }
  
    protected lazy val deleteLoadsByLoadDate: Parser[LogicalPlan] =
-     DELETE ~> (LOADS|SEGMENTS) ~> FROM ~> TABLE ~> (ident <~ ".").? ~ ident ~
+     DELETE ~> SEGMENTS ~> FROM ~> TABLE ~> (ident <~ ".").? ~ ident ~
        (WHERE ~> (STARTTIME <~ BEFORE) ~ stringLit) <~
        opt(";") ^^ {
 -      case schema ~ cube ~ condition =>
 +      case schema ~ table ~ condition =>
          condition match {
            case dateField ~ dateValue =>
 -            DeleteLoadsByLoadDate(schema, cube.toLowerCase(), dateField, dateValue)
 +            DeleteLoadsByLoadDate(schema, table.toLowerCase(), dateField, dateValue)
          }
      }
  

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/97598381/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --cc integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index d19d1c5,cc08604..11358fc
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@@ -798,16 -1225,13 +798,13 @@@ private[sql] case class AlterTableCompa
      val dataLoadSchema = new CarbonDataLoadSchema(table)
      // Need to fill dimension relation
      carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
 -    carbonLoadModel.setTableName(relation.cubeMeta.carbonTableIdentifier.getTableName)
 -    carbonLoadModel.setDatabaseName(relation.cubeMeta.carbonTableIdentifier.getDatabaseName)
 -    carbonLoadModel.setStorePath(relation.cubeMeta.storePath)
 +    carbonLoadModel.setTableName(relation.tableMeta.carbonTableIdentifier.getTableName)
 +    carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getDatabaseName)
 +    carbonLoadModel.setStorePath(relation.tableMeta.storePath)
  
 -    val partitioner = relation.cubeMeta.partitioner
 +    val partitioner = relation.tableMeta.partitioner
+     val kettleHomePath = CarbonScalaUtil.getKettleHome(sqlContext)
  
-     var kettleHomePath = CarbonScalaUtil.getKettleHomePath(sqlContext)
-     if (kettleHomePath == null) {
-       sys.error(s"carbon.kettle.home is not set")
-     }
      var storeLocation = CarbonProperties.getInstance
        .getProperty(CarbonCommonConstants.STORE_LOCATION_TEMP_PATH,
          System.getProperty("java.io.tmpdir")

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/97598381/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataLoadRDD.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/97598381/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/97598381/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/97598381/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/97598381/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --cc integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonScanRDD.scala
index 5a1dedc,0000000..144af04
mode 100644,000000..100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonScanRDD.scala
@@@ -1,290 -1,0 +1,290 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *    http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.carbondata.spark.rdd
 +
 +import java.util
 +
 +import scala.collection.JavaConverters._
 +import scala.reflect.ClassTag
 +
 +import org.apache.hadoop.conf.Configuration
 +import org.apache.hadoop.mapreduce.Job
 +import org.apache.spark.{Logging, Partition, SparkContext, TaskContext}
 +import org.apache.spark.rdd.RDD
 +import org.apache.spark.sql.hive.DistributionUtil
 +
 +import org.carbondata.common.CarbonIterator
 +import org.carbondata.common.logging.LogServiceFactory
 +import org.carbondata.core.cache.dictionary.Dictionary
 +import org.carbondata.core.carbon.datastore.block.{Distributable, TableBlockInfo}
 +import org.carbondata.core.carbon.querystatistics.{QueryStatistic, QueryStatisticsRecorder}
 +import org.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit}
 +import org.carbondata.scan.executor.QueryExecutorFactory
 +import org.carbondata.scan.expression.Expression
 +import org.carbondata.scan.model.QueryModel
 +import org.carbondata.scan.result.BatchResult
 +import org.carbondata.scan.result.iterator.ChunkRowIterator
 +import org.carbondata.spark.RawValue
 +import org.carbondata.spark.load.CarbonLoaderUtil
 +import org.carbondata.spark.util.QueryPlanUtil
 +
 +class CarbonSparkPartition(rddId: Int, val idx: Int,
-   val locations: Array[String],
-   val tableBlockInfos: util.List[TableBlockInfo])
++    val locations: Array[String],
++    val tableBlockInfos: util.List[TableBlockInfo])
 +  extends Partition {
 +
 +  override val index: Int = idx
 +
 +  // val serializableHadoopSplit = new SerializableWritable[Array[String]](locations)
 +  override def hashCode(): Int = {
 +    41 * (41 + rddId) + idx
 +  }
 +}
 +
-  /**
-   * This RDD is used to perform query on CarbonData file. Before sending tasks to scan
-   * CarbonData file, this RDD will leverage CarbonData's index information to do CarbonData file
-   * level filtering in driver side.
-   */
++/**
++ * This RDD is used to perform query on CarbonData file. Before sending tasks to scan
++ * CarbonData file, this RDD will leverage CarbonData's index information to do CarbonData file
++ * level filtering in driver side.
++ */
 +class CarbonScanRDD[V: ClassTag](
-   sc: SparkContext,
-   queryModel: QueryModel,
-   filterExpression: Expression,
-   keyClass: RawValue[V],
-   @transient conf: Configuration,
-   tableCreationTime: Long,
-   schemaLastUpdatedTime: Long,
-   baseStoreLocation: String)
++    sc: SparkContext,
++    queryModel: QueryModel,
++    filterExpression: Expression,
++    keyClass: RawValue[V],
++    @transient conf: Configuration,
++    tableCreationTime: Long,
++    schemaLastUpdatedTime: Long,
++    baseStoreLocation: String)
 +  extends RDD[V](sc, Nil) with Logging {
 +
 +  val defaultParallelism = sc.defaultParallelism
 +
 +  override def getPartitions: Array[Partition] = {
 +    val statisticRecorder = new QueryStatisticsRecorder(queryModel.getQueryId)
 +    val startTime = System.currentTimeMillis()
 +    val (carbonInputFormat: CarbonInputFormat[Array[Object]], job: Job) =
 +      QueryPlanUtil.createCarbonInputFormat(queryModel.getAbsoluteTableIdentifier)
 +
 +    val result = new util.ArrayList[Partition](defaultParallelism)
 +    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
 +    // set filter resolver tree
 +    try {
 +      // before applying filter check whether segments are available in the table.
 +      val splits = carbonInputFormat.getSplits(job)
 +      if (!splits.isEmpty) {
 +        val filterResolver = carbonInputFormat
 +          .getResolvedFilter(job.getConfiguration, filterExpression)
 +        CarbonInputFormat.setFilterPredicates(job.getConfiguration, filterResolver)
 +        queryModel.setFilterExpressionResolverTree(filterResolver)
 +      }
 +    }
 +    catch {
 +      case e: Exception =>
 +        LOGGER.error(e)
 +        sys.error("Exception occurred in query execution :: " + e.getMessage)
 +    }
 +    // get splits
 +    val splits = carbonInputFormat.getSplits(job)
 +    if (!splits.isEmpty) {
 +      val carbonInputSplits = splits.asScala.map(_.asInstanceOf[CarbonInputSplit])
 +
 +      val blockList = carbonInputSplits.map(inputSplit =>
 +        new TableBlockInfo(inputSplit.getPath.toString,
 +          inputSplit.getStart, inputSplit.getSegmentId,
 +          inputSplit.getLocations, inputSplit.getLength
 +        ).asInstanceOf[Distributable]
 +      )
 +      if (blockList.nonEmpty) {
 +        // group blocks to nodes, tasks
 +        val startTime = System.currentTimeMillis
 +        var statistic = new QueryStatistic
 +        val activeNodes = DistributionUtil
 +          .ensureExecutorsAndGetNodeList(blockList.toArray, sparkContext)
 +        val nodeBlockMapping =
 +          CarbonLoaderUtil.nodeBlockTaskMapping(blockList.asJava, -1, defaultParallelism,
 +            activeNodes.toList.asJava
 +          )
 +        val timeElapsed: Long = System.currentTimeMillis - startTime
 +        statistic.addStatistics("Total Time taken in block(s) allocation", System.currentTimeMillis)
 +        statisticRecorder.recordStatistics(statistic);
 +        statistic = new QueryStatistic
 +        var i = 0
 +        // Create Spark Partition for each task and assign blocks
 +        nodeBlockMapping.asScala.foreach { entry =>
 +          entry._2.asScala.foreach { blocksPerTask => {
 +            val tableBlockInfo = blocksPerTask.asScala.map(_.asInstanceOf[TableBlockInfo])
 +            if (blocksPerTask.size() != 0) {
 +              result
 +                .add(new CarbonSparkPartition(id, i, Seq(entry._1).toArray, tableBlockInfo.asJava))
 +              i += 1
 +            }
 +          }
 +          }
 +        }
 +        val noOfBlocks = blockList.size
 +        val noOfNodes = nodeBlockMapping.size
 +        val noOfTasks = result.size()
 +        logInfo(s"Identified  no.of.Blocks: $noOfBlocks,"
 +                + s"parallelism: $defaultParallelism , " +
 +                s"no.of.nodes: $noOfNodes, no.of.tasks: $noOfTasks"
 +        )
 +        statistic.addStatistics("Time taken to identify Block(s) to scan", System.currentTimeMillis)
 +        statisticRecorder.recordStatistics(statistic);
 +        statisticRecorder.logStatistics
 +        result.asScala.foreach { r =>
 +          val cp = r.asInstanceOf[CarbonSparkPartition]
 +          logInfo(s"Node : " + cp.locations.toSeq.mkString(",")
 +                  + ", No.Of Blocks : " + cp.tableBlockInfos.size()
 +          )
 +        }
 +      } else {
 +        logInfo("No blocks identified to scan")
 +        val nodesPerBlock = new util.ArrayList[TableBlockInfo]()
 +        result.add(new CarbonSparkPartition(id, 0, Seq("").toArray, nodesPerBlock))
 +      }
 +    }
 +    else {
 +      logInfo("No valid segments found to scan")
 +      val nodesPerBlock = new util.ArrayList[TableBlockInfo]()
 +      result.add(new CarbonSparkPartition(id, 0, Seq("").toArray, nodesPerBlock))
 +    }
 +    result.toArray(new Array[Partition](result.size()))
 +  }
 +
-    override def compute(thepartition: Partition, context: TaskContext): Iterator[V] = {
-      val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-      val iter = new Iterator[V] {
-        var rowIterator: CarbonIterator[Array[Any]] = _
-        var queryStartTime: Long = 0
-        try {
-          val carbonSparkPartition = thepartition.asInstanceOf[CarbonSparkPartition]
-          if(!carbonSparkPartition.tableBlockInfos.isEmpty) {
-            queryModel.setQueryId(queryModel.getQueryId + "_" + carbonSparkPartition.idx)
-            // fill table block info
-            queryModel.setTableBlockInfos(carbonSparkPartition.tableBlockInfos)
-            queryStartTime = System.currentTimeMillis
++  override def compute(thepartition: Partition, context: TaskContext): Iterator[V] = {
++    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
++    val iter = new Iterator[V] {
++      var rowIterator: CarbonIterator[Array[Any]] = _
++      var queryStartTime: Long = 0
++      try {
++        val carbonSparkPartition = thepartition.asInstanceOf[CarbonSparkPartition]
++        if(!carbonSparkPartition.tableBlockInfos.isEmpty) {
++          queryModel.setQueryId(queryModel.getQueryId + "_" + carbonSparkPartition.idx)
++          // fill table block info
++          queryModel.setTableBlockInfos(carbonSparkPartition.tableBlockInfos)
++          queryStartTime = System.currentTimeMillis
 +
-            val carbonPropertiesFilePath = System.getProperty("carbon.properties.filepath", null)
-            logInfo("*************************" + carbonPropertiesFilePath)
-            if (null == carbonPropertiesFilePath) {
-              System.setProperty("carbon.properties.filepath",
-                System.getProperty("user.dir") + '/' + "conf" + '/' + "carbon.properties")
-            }
-            // execute query
-            rowIterator = new ChunkRowIterator(
-              QueryExecutorFactory.getQueryExecutor(queryModel).execute(queryModel).
-                asInstanceOf[CarbonIterator[BatchResult]]).asInstanceOf[CarbonIterator[Array[Any]]]
++          val carbonPropertiesFilePath = System.getProperty("carbon.properties.filepath", null)
++          logInfo("*************************" + carbonPropertiesFilePath)
++          if (null == carbonPropertiesFilePath) {
++            System.setProperty("carbon.properties.filepath",
++              System.getProperty("user.dir") + '/' + "conf" + '/' + "carbon.properties")
++          }
++          // execute query
++          rowIterator = new ChunkRowIterator(
++            QueryExecutorFactory.getQueryExecutor(queryModel).execute(queryModel).
++              asInstanceOf[CarbonIterator[BatchResult]]).asInstanceOf[CarbonIterator[Array[Any]]]
 +
-          }
-        } catch {
-          case e: Exception =>
-            LOGGER.error(e)
-            if (null != e.getMessage) {
-              sys.error("Exception occurred in query execution :: " + e.getMessage)
-            } else {
-              sys.error("Exception occurred in query execution.Please check logs.")
-            }
-        }
++        }
++      } catch {
++        case e: Exception =>
++          LOGGER.error(e)
++          if (null != e.getMessage) {
++            sys.error("Exception occurred in query execution :: " + e.getMessage)
++          } else {
++            sys.error("Exception occurred in query execution.Please check logs.")
++          }
++      }
 +
-        var havePair = false
-        var finished = false
-        var recordCount = 0
++      var havePair = false
++      var finished = false
++      var recordCount = 0
 +
-        override def hasNext: Boolean = {
-          if (!finished && !havePair) {
-            finished = (null == rowIterator) || (!rowIterator.hasNext)
-            havePair = !finished
-          }
-          if (finished) {
-            clearDictionaryCache(queryModel.getColumnToDictionaryMapping)
-            if (null != queryModel.getStatisticsRecorder) {
-              val queryStatistic = new QueryStatistic
-              queryStatistic
-                .addStatistics("Total Time taken to execute the query in executor Side",
-                  System.currentTimeMillis - queryStartTime
-                )
-              queryModel.getStatisticsRecorder.recordStatistics(queryStatistic);
-              queryModel.getStatisticsRecorder.logStatistics();
-            }
-          }
-          !finished
-        }
++      override def hasNext: Boolean = {
++        if (!finished && !havePair) {
++          finished = (null == rowIterator) || (!rowIterator.hasNext)
++          havePair = !finished
++        }
++        if (finished) {
++          clearDictionaryCache(queryModel.getColumnToDictionaryMapping)
++          if (null != queryModel.getStatisticsRecorder) {
++            val queryStatistic = new QueryStatistic
++            queryStatistic
++              .addFixedTimeStatistic("Total Time taken to execute the query in executor Side",
++                System.currentTimeMillis - queryStartTime
++              )
++            queryModel.getStatisticsRecorder.recordStatistics(queryStatistic);
++            queryModel.getStatisticsRecorder.logStatistics();
++          }
++        }
++        !finished
++      }
 +
-        override def next(): V = {
-          if (!hasNext) {
-            throw new java.util.NoSuchElementException("End of stream")
-          }
-          havePair = false
-          recordCount += 1
-          if (queryModel.getLimit != -1 && recordCount >= queryModel.getLimit) {
-            clearDictionaryCache(queryModel.getColumnToDictionaryMapping)
-            if (null != queryModel.getStatisticsRecorder) {
-              val queryStatistic = new QueryStatistic
-              queryStatistic
-                .addStatistics("Total Time taken to execute the query in executor Side",
-                  System.currentTimeMillis - queryStartTime
-                )
-              queryModel.getStatisticsRecorder.recordStatistics(queryStatistic);
-              queryModel.getStatisticsRecorder.logStatistics();
-            }
-          }
-          keyClass.getValue(rowIterator.next())
-        }
-        def clearDictionaryCache(columnToDictionaryMap: java.util.Map[String, Dictionary]) = {
-          if (null != columnToDictionaryMap) {
-            org.carbondata.spark.util.CarbonQueryUtil
-              .clearColumnDictionaryCache(columnToDictionaryMap)
-          }
-        }
-      }
-      iter
-    }
++      override def next(): V = {
++        if (!hasNext) {
++          throw new java.util.NoSuchElementException("End of stream")
++        }
++        havePair = false
++        recordCount += 1
++        if (queryModel.getLimit != -1 && recordCount >= queryModel.getLimit) {
++          clearDictionaryCache(queryModel.getColumnToDictionaryMapping)
++          if (null != queryModel.getStatisticsRecorder) {
++            val queryStatistic = new QueryStatistic
++            queryStatistic
++              .addFixedTimeStatistic("Total Time taken to execute the query in executor Side",
++                System.currentTimeMillis - queryStartTime
++              )
++            queryModel.getStatisticsRecorder.recordStatistics(queryStatistic);
++            queryModel.getStatisticsRecorder.logStatistics();
++          }
++        }
++        keyClass.getValue(rowIterator.next())
++      }
++      def clearDictionaryCache(columnToDictionaryMap: java.util.Map[String, Dictionary]) = {
++        if (null != columnToDictionaryMap) {
++          org.carbondata.spark.util.CarbonQueryUtil
++            .clearColumnDictionaryCache(columnToDictionaryMap)
++        }
++      }
++    }
++    iter
++  }
 +
-    /**
-     * Get the preferred locations where to launch this task.
-     */
-    override def getPreferredLocations(partition: Partition): Seq[String] = {
-      val theSplit = partition.asInstanceOf[CarbonSparkPartition]
-      val firstOptionLocation = theSplit.locations.filter(_ != "localhost")
-      val tableBlocks = theSplit.tableBlockInfos
-      // node name and count mapping
-      val blockMap = new util.LinkedHashMap[String, Integer]()
++  /**
++   * Get the preferred locations where to launch this task.
++   */
++  override def getPreferredLocations(partition: Partition): Seq[String] = {
++    val theSplit = partition.asInstanceOf[CarbonSparkPartition]
++    val firstOptionLocation = theSplit.locations.filter(_ != "localhost")
++    val tableBlocks = theSplit.tableBlockInfos
++    // node name and count mapping
++    val blockMap = new util.LinkedHashMap[String, Integer]()
 +
-      tableBlocks.asScala.foreach(tableBlock => tableBlock.getLocations.foreach(
-        location => {
-          if (!firstOptionLocation.exists(location.equalsIgnoreCase(_))) {
-            val currentCount = blockMap.get(location)
-            if (currentCount == null) {
-              blockMap.put(location, 1)
-            } else {
-              blockMap.put(location, currentCount + 1)
-            }
-          }
-        }
-      )
-      )
++    tableBlocks.asScala.foreach(tableBlock => tableBlock.getLocations.foreach(
++      location => {
++        if (!firstOptionLocation.exists(location.equalsIgnoreCase(_))) {
++          val currentCount = blockMap.get(location)
++          if (currentCount == null) {
++            blockMap.put(location, 1)
++          } else {
++            blockMap.put(location, currentCount + 1)
++          }
++        }
++      }
++    )
++    )
 +
-      val sortedList = blockMap.entrySet().asScala.toSeq.sortWith((nodeCount1, nodeCount2) => {
-        nodeCount1.getValue > nodeCount2.getValue
-      }
-      )
++    val sortedList = blockMap.entrySet().asScala.toSeq.sortWith((nodeCount1, nodeCount2) => {
++      nodeCount1.getValue > nodeCount2.getValue
++    }
++    )
 +
-      val sortedNodesList = sortedList.map(nodeCount => nodeCount.getKey).take(2)
-      firstOptionLocation ++ sortedNodesList
-    }
++    val sortedNodesList = sortedList.map(nodeCount => nodeCount.getKey).take(2)
++    firstOptionLocation ++ sortedNodesList
++  }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/97598381/integration/spark/src/main/scala/org/carbondata/spark/util/CarbonScalaUtil.scala
----------------------------------------------------------------------
diff --cc integration/spark/src/main/scala/org/carbondata/spark/util/CarbonScalaUtil.scala
index 5406e77,87dd0ce..8b91745
--- a/integration/spark/src/main/scala/org/carbondata/spark/util/CarbonScalaUtil.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/util/CarbonScalaUtil.scala
@@@ -17,9 -17,13 +17,12 @@@
  
  package org.carbondata.spark.util
  
+ import java.io.File
+ 
  import scala.collection.JavaConverters._
  
+ import org.apache.spark.Logging
  import org.apache.spark.sql._
 -import org.apache.spark.sql.execution.command.Level
  import org.apache.spark.sql.hive.{CarbonMetaData, DictionaryMap}
  import org.apache.spark.sql.types._
  
@@@ -27,9 -31,11 +30,10 @@@ import org.carbondata.core.carbon.metad
  import org.carbondata.core.carbon.metadata.encoder.Encoding
  import org.carbondata.core.carbon.metadata.schema.table.CarbonTable
  import org.carbondata.core.constants.CarbonCommonConstants
+ import org.carbondata.core.datastorage.store.impl.FileFactory
 -import org.carbondata.core.util.CarbonProperties
 -import org.carbondata.query.expression.{DataType => CarbonDataType}
 +import org.carbondata.core.util.{CarbonProperties, CarbonUtil}
  
- object CarbonScalaUtil {
+ object CarbonScalaUtil extends Logging {
    def convertSparkToCarbonDataType(
        dataType: org.apache.spark.sql.types.DataType): CarbonDataType = {
      dataType match {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/97598381/processing/src/main/java/org/carbondata/processing/csvload/DataGraphExecuter.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/97598381/processing/src/main/java/org/carbondata/processing/csvload/GraphExecutionUtil.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/97598381/processing/src/main/java/org/carbondata/processing/csvreaderstep/CsvInput.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/97598381/processing/src/main/java/org/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/97598381/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java
----------------------------------------------------------------------


[12/20] incubator-carbondata git commit: [CARBONDATA-126] Stream not getting close for last block in data loading (#891)

Posted by ra...@apache.org.
[CARBONDATA-126] Stream not getting close for last block in data loading (#891)



Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/a724831e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/a724831e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/a724831e

Branch: refs/heads/master
Commit: a724831ed6ef124b029665a6f92b7e8a848c63a8
Parents: 75ddcce
Author: Kumar Vishal <ku...@gmail.com>
Authored: Thu Aug 4 13:14:50 2016 +0530
Committer: Venkata Ramana G <g....@gmail.com>
Committed: Thu Aug 4 13:14:50 2016 +0530

----------------------------------------------------------------------
 .../org/carbondata/processing/csvreaderstep/UnivocityCsvParser.java | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a724831e/processing/src/main/java/org/carbondata/processing/csvreaderstep/UnivocityCsvParser.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/processing/csvreaderstep/UnivocityCsvParser.java b/processing/src/main/java/org/carbondata/processing/csvreaderstep/UnivocityCsvParser.java
index 9594a24..01b5bf0 100644
--- a/processing/src/main/java/org/carbondata/processing/csvreaderstep/UnivocityCsvParser.java
+++ b/processing/src/main/java/org/carbondata/processing/csvreaderstep/UnivocityCsvParser.java
@@ -153,6 +153,7 @@ public class UnivocityCsvParser {
   public boolean hasMoreRecords() throws IOException {
     row = parser.parseNext();
     if (row == null && blockCounter + 1 >= this.csvParserVo.getBlockDetailsList().size()) {
+      close();
       return false;
     }
     if (row == null) {


[06/20] incubator-carbondata git commit: [CARBONDATA-127] Decimal datatype result in cast exception in compaction (#897)

Posted by ra...@apache.org.
[CARBONDATA-127] Decimal datatype result in cast exception in compaction (#897)

* in case of decimal measure we will get spark Decimal data type.
but in writer it expects byte array. so converting decimal to byte array.

Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/a7a5eb2c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/a7a5eb2c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/a7a5eb2c

Branch: refs/heads/master
Commit: a7a5eb2c69bc132f8a3b1bb65619c8f79a0186ac
Parents: ac8d866
Author: ravikiran23 <ra...@gmail.com>
Authored: Tue Aug 2 04:33:59 2016 +0530
Committer: Venkata Ramana G <g....@gmail.com>
Committed: Tue Aug 2 04:33:59 2016 +0530

----------------------------------------------------------------------
 .../MajorCompactionStopsAfterCompaction.scala   |  2 +-
 .../testsuite/joinquery/EquiJoinTestCase.scala  |  4 +++
 .../store/CarbonFactDataHandlerColumnar.java    | 28 +++++++++++++++++---
 3 files changed, 30 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a7a5eb2c/integration/spark/src/test/scala/org/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala
index 3cbf5dd..17f6bd3 100644
--- a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala
+++ b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala
@@ -23,7 +23,7 @@ class MajorCompactionStopsAfterCompaction extends QueryTest with BeforeAndAfterA
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "mm/dd/yyyy")
     sql(
-      "CREATE TABLE IF NOT EXISTS stopmajor (country String, ID Int, date Timestamp, name " +
+      "CREATE TABLE IF NOT EXISTS stopmajor (country String, ID decimal(7,4), date Timestamp, name " +
         "String, " +
         "phonetype String, serialname String, salary Int) STORED BY 'org.apache.carbondata" +
         ".format'"

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a7a5eb2c/integration/spark/src/test/scala/org/carbondata/spark/testsuite/joinquery/EquiJoinTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/joinquery/EquiJoinTestCase.scala b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/joinquery/EquiJoinTestCase.scala
index 3943352..37b8ebc 100644
--- a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/joinquery/EquiJoinTestCase.scala
+++ b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/joinquery/EquiJoinTestCase.scala
@@ -7,6 +7,10 @@ import org.apache.spark.sql.execution.joins.BroadCastFilterPushJoin
 
 class EquiJoinTestCase extends QueryTest with BeforeAndAfterAll  {
    override def beforeAll {
+    sql("drop table if exists employee_hive")
+    sql("drop table if exists mobile_hive")
+    sql("drop table if exists employee")
+    sql("drop table if exists mobile")
     //loading to hive table
     sql("create table employee_hive (empid string,empname string,mobilename string,mobilecolor string,salary int)row format delimited fields terminated by ','")
     sql("create table mobile_hive (mobileid string,mobilename string, mobilecolor string, sales int)row format delimited fields terminated by ','");

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a7a5eb2c/processing/src/main/java/org/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
index 2a43d23..abb4d01 100644
--- a/processing/src/main/java/org/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
+++ b/processing/src/main/java/org/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
@@ -76,6 +76,8 @@ import org.carbondata.processing.store.writer.NodeHolder;
 import org.carbondata.processing.store.writer.exception.CarbonDataWriterException;
 import org.carbondata.processing.util.RemoveDictionaryUtil;
 
+import org.apache.spark.sql.types.Decimal;
+
 /**
  * Fact data handler class to handle the fact data
  */
@@ -260,6 +262,10 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
    * Segment properties
    */
   private SegmentProperties segmentProperties;
+  /**
+   * flag to check for compaction flow
+   */
+  private boolean compactionFlow;
 
   /**
    * CarbonFactDataHandler constructor
@@ -353,7 +359,10 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     dimensionType =
         CarbonUtil.identifyDimensionType(carbonTable.getDimensionByTableName(tableName));
 
-    if (carbonFactDataHandlerModel.isCompactionFlow()) {
+    this.compactionFlow = carbonFactDataHandlerModel.isCompactionFlow();
+    // in compaction flow the measure with decimal type will come as spark decimal.
+    // need to convert it to byte array.
+    if (compactionFlow) {
       try {
         numberOfCores = Integer.parseInt(CarbonProperties.getInstance()
             .getProperty(CarbonCommonConstants.NUM_CORES_COMPACTING,
@@ -562,7 +571,12 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
           b = DataTypeUtil.bigDecimalToByte(val);
           nullValueIndexBitSet[customMeasureIndex[i]].set(count);
         } else {
-          b = (byte[]) row[customMeasureIndex[i]];
+          if (this.compactionFlow) {
+            BigDecimal bigDecimal = ((Decimal) row[customMeasureIndex[i]]).toJavaBigDecimal();
+            b = DataTypeUtil.bigDecimalToByte(bigDecimal);
+          } else {
+            b = (byte[]) row[customMeasureIndex[i]];
+          }
         }
         byteBuffer = ByteBuffer.allocate(b.length + CarbonCommonConstants.INT_SIZE_IN_BYTE);
         byteBuffer.putInt(b.length);
@@ -889,7 +903,15 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
           int num = (value % 1 == 0) ? 0 : CarbonCommonConstants.CARBON_DECIMAL_POINTERS_DEFAULT;
           decimal[count] = (decimal[count] > num ? decimal[count] : num);
         } else if (type[count] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
-          byte[] buff = (byte[]) row[count];
+          byte[] buff = null;
+          // in compaction flow the measure with decimal type will come as spark decimal.
+          // need to convert it to byte array.
+          if (this.compactionFlow) {
+            BigDecimal bigDecimal = ((Decimal) row[count]).toJavaBigDecimal();
+            buff = DataTypeUtil.bigDecimalToByte(bigDecimal);
+          } else {
+            buff = (byte[]) row[count];
+          }
           BigDecimal value = DataTypeUtil.byteToBigDecimal(buff);
           BigDecimal minVal = (BigDecimal) min[count];
           min[count] = minVal.min(value);


[09/20] incubator-carbondata git commit: [CARBONDATA-134] changing store location to inside that of the container. (#901)

Posted by ra...@apache.org.
[CARBONDATA-134] changing store location to inside that of the container. (#901)

* changing store location to inside that of the container.
* deleting the empty folders of data load in temp location.
* Adding switch to configure to use LOCAL_DIRS or java temp dir, defaulting to java temp dir.

Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/d70a6e55
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/d70a6e55
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/d70a6e55

Branch: refs/heads/master
Commit: d70a6e55514651afc3b780851c1cb918d1f7f1fd
Parents: b327375
Author: ravikiran23 <ra...@gmail.com>
Authored: Thu Aug 4 12:54:21 2016 +0530
Committer: Venkata Ramana G <g....@gmail.com>
Committed: Thu Aug 4 12:54:21 2016 +0530

----------------------------------------------------------------------
 .../carbondata/spark/load/CarbonLoaderUtil.java | 28 +++-----------------
 .../spark/rdd/CarbonDataLoadRDD.scala           | 21 +++++++++++----
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  | 21 +++++++++++----
 .../processing/csvreaderstep/CsvInput.java      | 24 +++++------------
 .../csvbased/CarbonCSVBasedSeqGenStep.java      | 23 +++++-----------
 5 files changed, 49 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d70a6e55/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoaderUtil.java b/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoaderUtil.java
index bce99a7..e0e109d 100644
--- a/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoaderUtil.java
+++ b/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoaderUtil.java
@@ -425,46 +425,26 @@ public final class CarbonLoaderUtil {
    * This method will delete the local data load folder location after data load is complete
    *
    * @param loadModel
-   * @param segmentName
    */
   public static void deleteLocalDataLoadFolderLocation(CarbonLoadModel loadModel,
-      String segmentName, boolean isCompactionFlow) {
+      boolean isCompactionFlow) {
     String databaseName = loadModel.getDatabaseName();
     String tableName = loadModel.getTableName();
-    CarbonTableIdentifier carbonTableIdentifier =
-        loadModel.getCarbonDataLoadSchema().getCarbonTable().getCarbonTableIdentifier();
-    String segmentId = segmentName.substring(CarbonCommonConstants.LOAD_FOLDER.length());
     String tempLocationKey = databaseName + CarbonCommonConstants.UNDERSCORE + tableName
         + CarbonCommonConstants.UNDERSCORE + loadModel.getTaskNo();
     if (isCompactionFlow) {
       tempLocationKey = CarbonCommonConstants.COMPACTION_KEY_WORD + '_' + tempLocationKey;
     }
     // form local store location
-    String localStoreLocation = getStoreLocation(CarbonProperties.getInstance()
-            .getProperty(tempLocationKey, CarbonCommonConstants.STORE_LOCATION_DEFAULT_VAL),
-        carbonTableIdentifier, segmentId, loadModel.getPartitionId());
+    String localStoreLocation = CarbonProperties.getInstance()
+        .getProperty(tempLocationKey, CarbonCommonConstants.STORE_LOCATION_DEFAULT_VAL);
     try {
-      CarbonUtil.deleteFoldersAndFiles(new File[] { new File(localStoreLocation) });
+      CarbonUtil.deleteFoldersAndFiles(new File[] { new File(localStoreLocation).getParentFile()});
       LOGGER.info("Deleted the local store location" + localStoreLocation);
     } catch (CarbonUtilException e) {
       LOGGER.error(e, "Failed to delete local data load folder location");
     }
 
-    // delete ktr file.
-    if (!isCompactionFlow) {
-      String graphPath = CarbonProperties.getInstance()
-          .getProperty(tempLocationKey, CarbonCommonConstants.STORE_LOCATION_DEFAULT_VAL)
-          + File.separator + "/etl" + File.separator + databaseName + File.separator + tableName
-          + File.separator + loadModel.getSegmentId() + File.separator + loadModel.getTaskNo()
-          + File.separator + tableName + ".ktr";
-      File path = new File(graphPath);
-      if (path.exists()) {
-        if (!path.delete()) {
-          LOGGER.error("failed to delete the ktr file in path " + path);
-        }
-      }
-    }
-
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d70a6e55/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataLoadRDD.scala b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataLoadRDD.scala
index 2a19da9..b8cab78 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataLoadRDD.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataLoadRDD.scala
@@ -175,11 +175,22 @@ class CarbonDataLoadRDD[K, V](
         CarbonProperties.getInstance().addProperty("high.cardinality.value", "100000")
         CarbonProperties.getInstance().addProperty("is.compressed.keyblock", "false")
         CarbonProperties.getInstance().addProperty("carbon.leaf.node.size", "120000")
-        val storeLocations = CarbonLoaderUtil.getConfiguredLocalDirs(SparkEnv.get.conf)
-        if (null != storeLocations && storeLocations.length > 0) {
-          storeLocation = storeLocations(Random.nextInt(storeLocations.length))
+
+        // this property is used to determine whether temp location for carbon is inside
+        // container temp dir or is yarn application directory.
+        val carbonUseLocalDir = CarbonProperties.getInstance()
+          .getProperty("carbon.use.local.dir", "false")
+
+        if(carbonUseLocalDir.equalsIgnoreCase("true")) {
+          val storeLocations = CarbonLoaderUtil.getConfiguredLocalDirs(SparkEnv.get.conf)
+          if (null != storeLocations && storeLocations.length > 0) {
+            storeLocation = storeLocations(Random.nextInt(storeLocations.length))
+          }
+          if (storeLocation == null) {
+            storeLocation = System.getProperty("java.io.tmpdir")
+          }
         }
-        if (storeLocation == null) {
+        else {
           storeLocation = System.getProperty("java.io.tmpdir")
         }
         storeLocation = storeLocation + '/' + System.nanoTime() + '/' + theSplit.index
@@ -211,7 +222,7 @@ class CarbonDataLoadRDD[K, V](
             try {
               val isCompaction = false
               CarbonLoaderUtil
-                .deleteLocalDataLoadFolderLocation(model, newSlice, isCompaction)
+                .deleteLocalDataLoadFolderLocation(model, isCompaction)
             } catch {
               case e: Exception =>
                 LOGGER.error(e)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d70a6e55/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonMergerRDD.scala
index 7b15cbf..a3dd1c6 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -77,11 +77,22 @@ class CarbonMergerRDD[K, V](
         .getDatabaseName + '_' + carbonLoadModel
         .getTableName + '_' + carbonLoadModel.getTaskNo
 
-      val storeLocations = CarbonLoaderUtil.getConfiguredLocalDirs(SparkEnv.get.conf)
-      if (null != storeLocations && storeLocations.length > 0) {
-        storeLocation = storeLocations(Random.nextInt(storeLocations.length))
+      // this property is used to determine whether temp location for carbon is inside
+      // container temp dir or is yarn application directory.
+      val carbonUseLocalDir = CarbonProperties.getInstance()
+        .getProperty("carbon.use.local.dir", "false")
+
+      if(carbonUseLocalDir.equalsIgnoreCase("true")) {
+
+        val storeLocations = CarbonLoaderUtil.getConfiguredLocalDirs(SparkEnv.get.conf)
+        if (null != storeLocations && storeLocations.length > 0) {
+          storeLocation = storeLocations(Random.nextInt(storeLocations.length))
+        }
+        if (storeLocation == null) {
+          storeLocation = System.getProperty("java.io.tmpdir")
+        }
       }
-      if (storeLocation == null) {
+      else {
         storeLocation = System.getProperty("java.io.tmpdir")
       }
       storeLocation = storeLocation + '/' + System.nanoTime() + '/' + theSplit.index
@@ -177,7 +188,7 @@ class CarbonMergerRDD[K, V](
         try {
           val isCompactionFlow = true
           CarbonLoaderUtil
-            .deleteLocalDataLoadFolderLocation(carbonLoadModel, newSlice, isCompactionFlow)
+            .deleteLocalDataLoadFolderLocation(carbonLoadModel, isCompactionFlow)
         } catch {
           case e: Exception =>
             LOGGER.error(e)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d70a6e55/processing/src/main/java/org/carbondata/processing/csvreaderstep/CsvInput.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/processing/csvreaderstep/CsvInput.java b/processing/src/main/java/org/carbondata/processing/csvreaderstep/CsvInput.java
index 3b69b4a..b865393 100644
--- a/processing/src/main/java/org/carbondata/processing/csvreaderstep/CsvInput.java
+++ b/processing/src/main/java/org/carbondata/processing/csvreaderstep/CsvInput.java
@@ -22,6 +22,7 @@ import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -397,27 +398,16 @@ public class CsvInput extends BaseStep implements StepInterface {
     }
 
     resultArray = results.toArray(new Future[results.size()]);
-    boolean completed = false;
     try {
-      while (!completed) {
-        completed = true;
-        for (int j = 0; j < resultArray.length; j++) {
-          if (!resultArray[j].isDone()) {
-            completed = false;
-          }
-
-        }
-        if (isTerminated) {
-          exec.shutdownNow();
-          throw new RuntimeException("Interrupted due to failing of other threads");
-        }
-        Thread.sleep(100);
-
+      for (int j = 0; j < resultArray.length; j++) {
+        resultArray[j].get();
       }
-    } catch (InterruptedException e) {
+    } catch (InterruptedException | ExecutionException e) {
       throw new RuntimeException("Thread InterruptedException", e);
     }
-    exec.shutdown();
+    finally {
+      exec.shutdownNow();
+    }
   }
 
   private void doProcessUnivocity() {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d70a6e55/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java b/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java
index c2093ad..37c912e 100644
--- a/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java
+++ b/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java
@@ -37,6 +37,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -750,27 +751,15 @@ public class CarbonCSVBasedSeqGenStep extends BaseStep {
     }
 
     this.resultArray = results.toArray(new Future[results.size()]);
-    boolean completed = false;
     try {
-      while (!completed) {
-        completed = true;
-        for (int j = 0; j < this.resultArray.length; j++) {
-          if (!this.resultArray[j].isDone()) {
-            completed = false;
-          }
-
-        }
-        if (isTerminated) {
-          exec.shutdownNow();
-          throw new RuntimeException("Interrupted due to failing of other threads");
-        }
-        Thread.sleep(100);
-
+      for (int j = 0; j < this.resultArray.length; j++) {
+        this.resultArray[j].get();
       }
-    } catch (InterruptedException e) {
+    } catch (InterruptedException | ExecutionException e) {
       throw new RuntimeException("Thread InterruptedException", e);
+    } finally {
+      exec.shutdownNow();
     }
-    exec.shutdown();
   }
 
   private int[] getUpdatedLens(int[] lens, boolean[] presentDims) {


[03/20] incubator-carbondata git commit: [CARBONDATA-129]Added null check before adding to carbonproperties. (#895)

Posted by ra...@apache.org.
[CARBONDATA-129]Added null check before adding to carbonproperties. (#895)

Added null check before adding to carbonproperties inorder to avoid npe if lock type is either local/zookeeper/hdfs .

Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/c44bfd36
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/c44bfd36
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/c44bfd36

Branch: refs/heads/master
Commit: c44bfd36fc875ebb5bb71ec28f8f423002767158
Parents: 1695d60
Author: ashokblend <as...@gmail.com>
Authored: Mon Aug 1 19:22:52 2016 +0530
Committer: sujith71955 <su...@gmail.com>
Committed: Mon Aug 1 19:22:52 2016 +0530

----------------------------------------------------------------------
 .../spark/rdd/CarbonGlobalDictionaryRDD.scala     | 18 ++++++++++++------
 1 file changed, 12 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/c44bfd36/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
index 70d0cb1..6768320 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
@@ -300,12 +300,18 @@ class CarbonGlobalDictionaryGenerateRDD(
       val pathService = CarbonCommonFactory.getPathService
       val carbonTablePath = pathService.getCarbonTablePath(model.columnIdentifier(split.index),
           model.hdfsLocation, model.table)
-      CarbonProperties.getInstance.addProperty(CarbonCommonConstants.HDFS_TEMP_LOCATION,
-        model.hdfsTempLocation)
-      CarbonProperties.getInstance.addProperty(CarbonCommonConstants.LOCK_TYPE,
-        model.lockType)
-      CarbonProperties.getInstance.addProperty(CarbonCommonConstants.ZOOKEEPER_URL,
-        model.zooKeeperUrl)
+      if (null != model.hdfsTempLocation) {
+         CarbonProperties.getInstance.addProperty(CarbonCommonConstants.HDFS_TEMP_LOCATION,
+           model.hdfsTempLocation)
+      }
+      if (null != model.lockType) {
+         CarbonProperties.getInstance.addProperty(CarbonCommonConstants.LOCK_TYPE,
+           model.lockType)
+      }
+      if (null != model.zooKeeperUrl) {
+         CarbonProperties.getInstance.addProperty(CarbonCommonConstants.ZOOKEEPER_URL,
+          model.zooKeeperUrl)
+      }
       val dictLock = CarbonLockFactory
         .getCarbonLockObj(carbonTablePath.getRelativeDictionaryDirectory,
           model.columnIdentifier(split.index).getColumnId + LockUsage.LOCK)


[04/20] incubator-carbondata git commit: [CARBONDATA-130]Adapt kettle home when "carbon.kettle.home" configuration is wrong (#889)

Posted by ra...@apache.org.
[CARBONDATA-130]Adapt kettle home when  "carbon.kettle.home" configuration is wrong (#889)

Generally kettle home is inside carbon lib so same can be looked in default

Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/f9c2e57d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/f9c2e57d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/f9c2e57d

Branch: refs/heads/master
Commit: f9c2e57d670a51d5d8379f8b9b89a37b6909ce10
Parents: c44bfd3
Author: Gin-zhj <zh...@huawei.com>
Authored: Mon Aug 1 22:20:26 2016 +0800
Committer: Kumar Vishal <ku...@gmail.com>
Committed: Mon Aug 1 19:50:26 2016 +0530

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |  4 ++
 .../execution/command/carbonTableSchema.scala   | 25 ++--------
 .../spark/rdd/CarbonDataRDDFactory.scala        | 10 +---
 .../carbondata/spark/util/CarbonScalaUtil.scala | 51 +++++++++++++++++++-
 4 files changed, 60 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f9c2e57d/core/src/main/java/org/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/carbondata/core/constants/CarbonCommonConstants.java
index b45d9b4..724698f 100644
--- a/core/src/main/java/org/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/carbondata/core/constants/CarbonCommonConstants.java
@@ -73,6 +73,10 @@ public final class CarbonCommonConstants {
    */
   public static final String STORE_LOCATION_DEFAULT_VAL = "../carbon.store";
   /**
+   * the folder name of kettle home path
+   */
+  public static final String KETTLE_HOME_NAME = "carbonplugins";
+  /**
    * CARDINALITY_INCREMENT_DEFAULT_VALUE
    */
   public static final int CARDINALITY_INCREMENT_VALUE_DEFAULT_VAL = 10;

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f9c2e57d/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 1dd066f..cc08604 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -1230,14 +1230,8 @@ private[sql] case class AlterTableCompaction(alterTableModel: AlterTableModel) e
     carbonLoadModel.setStorePath(relation.cubeMeta.storePath)
 
     val partitioner = relation.cubeMeta.partitioner
+    val kettleHomePath = CarbonScalaUtil.getKettleHome(sqlContext)
 
-    var kettleHomePath = sqlContext.getConf("carbon.kettle.home", null)
-    if (null == kettleHomePath) {
-      kettleHomePath = CarbonProperties.getInstance.getProperty("carbon.kettle.home")
-    }
-    if (kettleHomePath == null) {
-      sys.error(s"carbon.kettle.home is not set")
-    }
     var storeLocation = CarbonProperties.getInstance
       .getProperty(CarbonCommonConstants.STORE_LOCATION_TEMP_PATH,
         System.getProperty("java.io.tmpdir")
@@ -1524,13 +1518,7 @@ private[sql] case class LoadCube(
       storeLocation = storeLocation + "/carbonstore/" + System.nanoTime()
 
       val columinar = sqlContext.getConf("carbon.is.columnar.storage", "true").toBoolean
-      var kettleHomePath = sqlContext.getConf("carbon.kettle.home", null)
-      if (null == kettleHomePath) {
-        kettleHomePath = CarbonProperties.getInstance.getProperty("carbon.kettle.home")
-      }
-      if (kettleHomePath == null) {
-        sys.error(s"carbon.kettle.home is not set")
-      }
+      val kettleHomePath = CarbonScalaUtil.getKettleHome(sqlContext)
 
       val delimiter = partionValues.getOrElse("delimiter", ",")
       val quoteChar = partionValues.getOrElse("quotechar", "\"")
@@ -1721,13 +1709,8 @@ private[sql] case class LoadAggregationTable(
         System.getProperty("java.io.tmpdir"))
     storeLocation = storeLocation + "/carbonstore/" + System.currentTimeMillis()
     val columinar = sqlContext.getConf("carbon.is.columnar.storage", "true").toBoolean
-    var kettleHomePath = sqlContext.getConf("carbon.kettle.home", null)
-    if (null == kettleHomePath) {
-      kettleHomePath = CarbonProperties.getInstance.getProperty("carbon.kettle.home")
-    }
-    if (kettleHomePath == null) {
-      sys.error(s"carbon.kettle.home is not set")
-    }
+    val kettleHomePath = CarbonScalaUtil.getKettleHome(sqlContext)
+
     CarbonDataRDDFactory.loadCarbonData(
       sqlContext,
       carbonLoadModel,

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f9c2e57d/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 49e6702..f554a35 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -52,7 +52,7 @@ import org.carbondata.spark._
 import org.carbondata.spark.load._
 import org.carbondata.spark.merger.CarbonDataMergerUtil
 import org.carbondata.spark.splits.TableSplit
-import org.carbondata.spark.util.{CarbonQueryUtil, LoadMetadataUtil}
+import org.carbondata.spark.util.{CarbonQueryUtil, CarbonScalaUtil, LoadMetadataUtil}
 
 
 /**
@@ -255,13 +255,7 @@ object CarbonDataRDDFactory extends Logging {
         )
       storeLocation = storeLocation + "/carbonstore/" + System.currentTimeMillis()
       val columinar = sqlContext.getConf("carbon.is.columnar.storage", "true").toBoolean
-      var kettleHomePath = sqlContext.getConf("carbon.kettle.home", null)
-      if (null == kettleHomePath) {
-        kettleHomePath = CarbonProperties.getInstance.getProperty("carbon.kettle.home")
-      }
-      if (kettleHomePath == null) {
-        sys.error(s"carbon.kettle.home is not set")
-      }
+      val kettleHomePath = CarbonScalaUtil.getKettleHome(sqlContext)
       CarbonDataRDDFactory.loadCarbonData(
         sqlContext,
         carbonLoadModel,

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f9c2e57d/integration/spark/src/main/scala/org/carbondata/spark/util/CarbonScalaUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/util/CarbonScalaUtil.scala b/integration/spark/src/main/scala/org/carbondata/spark/util/CarbonScalaUtil.scala
index b3effd3..87dd0ce 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/util/CarbonScalaUtil.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/util/CarbonScalaUtil.scala
@@ -17,8 +17,11 @@
 
 package org.carbondata.spark.util
 
+import java.io.File
+
 import scala.collection.JavaConverters._
 
+import org.apache.spark.Logging
 import org.apache.spark.sql._
 import org.apache.spark.sql.execution.command.Level
 import org.apache.spark.sql.hive.{CarbonMetaData, DictionaryMap}
@@ -28,9 +31,11 @@ import org.carbondata.core.carbon.metadata.datatype.DataType
 import org.carbondata.core.carbon.metadata.encoder.Encoding
 import org.carbondata.core.carbon.metadata.schema.table.CarbonTable
 import org.carbondata.core.constants.CarbonCommonConstants
+import org.carbondata.core.datastorage.store.impl.FileFactory
+import org.carbondata.core.util.CarbonProperties
 import org.carbondata.query.expression.{DataType => CarbonDataType}
 
-object CarbonScalaUtil {
+object CarbonScalaUtil extends Logging {
   def convertSparkToCarbonDataType(
       dataType: org.apache.spark.sql.types.DataType): CarbonDataType = {
     dataType match {
@@ -142,4 +147,48 @@ object CarbonScalaUtil {
     }
   }
 
+  def getKettleHome(sqlContext: SQLContext): String = {
+    var kettleHomePath = sqlContext.getConf("carbon.kettle.home", null)
+    if (null == kettleHomePath) {
+      kettleHomePath = CarbonProperties.getInstance.getProperty("carbon.kettle.home")
+    }
+    if (kettleHomePath != null) {
+      val sparkMaster = sqlContext.sparkContext.getConf.get("spark.master").toLowerCase()
+      // get spark master, if local, need to correct the kettle home
+      // e.g: --master local, the executor running in local machine
+      if (sparkMaster.startsWith("local")) {
+        val kettleHomeFileType = FileFactory.getFileType(kettleHomePath)
+        val kettleHomeFile = FileFactory.getCarbonFile(kettleHomePath, kettleHomeFileType)
+        // check if carbon.kettle.home path is exists
+        if (!kettleHomeFile.exists()) {
+          // get the path of this class
+          // e.g: file:/srv/bigdata/install/spark/sparkJdbc/carbonlib/carbon-
+          // xxx.jar!/org/carbondata/spark/rdd/
+          var jarFilePath = this.getClass.getResource("").getPath
+          val endIndex = jarFilePath.indexOf(".jar!") + 4
+          // get the jar file path
+          // e.g: file:/srv/bigdata/install/spark/sparkJdbc/carbonlib/carbon-*.jar
+          jarFilePath = jarFilePath.substring(0, endIndex)
+          val jarFileType = FileFactory.getFileType(jarFilePath)
+          val jarFile = FileFactory.getCarbonFile(jarFilePath, jarFileType)
+          // get the parent folder of the jar file
+          // e.g:file:/srv/bigdata/install/spark/sparkJdbc/carbonlib
+          val carbonLibPath = jarFile.getParentFile.getPath
+          // find the kettle home under the previous folder
+          // e.g:file:/srv/bigdata/install/spark/sparkJdbc/carbonlib/cabonplugins
+          kettleHomePath = carbonLibPath + File.separator + CarbonCommonConstants.KETTLE_HOME_NAME
+          logInfo(s"carbon.kettle.home path is not exists, reset it as $kettleHomePath")
+          val newKettleHomeFileType = FileFactory.getFileType(kettleHomePath)
+          val newKettleHomeFile = FileFactory.getCarbonFile(kettleHomePath, newKettleHomeFileType)
+          // check if the found kettle home exists
+          if (!newKettleHomeFile.exists()) {
+            sys.error("Kettle home not found. Failed to reset carbon.kettle.home")
+          }
+        }
+      }
+    } else {
+      sys.error("carbon.kettle.home is not set")
+    }
+    kettleHomePath
+  }
 }


[14/20] incubator-carbondata git commit: [CARBONDATA-139] Sortindex read by by appending offset from metadata (#905)

Posted by ra...@apache.org.
[CARBONDATA-139] Sortindex read by by appending offset from metadata (#905)

During sort index reading phase, read offset from dictionary chunk meta and then get respective file.
Corrected testcase

Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/1721d40d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/1721d40d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/1721d40d

Branch: refs/heads/master
Commit: 1721d40d3d44d91c8f1115febc8a449071129f22
Parents: 07fe4d2
Author: ashokblend <as...@gmail.com>
Authored: Thu Aug 4 16:54:07 2016 +0530
Committer: Venkata Ramana G <g....@gmail.com>
Committed: Thu Aug 4 16:54:07 2016 +0530

----------------------------------------------------------------------
 .../CarbonDictionarySortIndexReaderImpl.java    | 43 ++++++++++++++++----
 ...CarbonDictionarySortIndexReaderImplTest.java | 15 ++++++-
 ...CarbonDictionarySortIndexWriterImplTest.java | 15 +++++++
 3 files changed, 65 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1721d40d/core/src/main/java/org/carbondata/core/reader/sortindex/CarbonDictionarySortIndexReaderImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/reader/sortindex/CarbonDictionarySortIndexReaderImpl.java b/core/src/main/java/org/carbondata/core/reader/sortindex/CarbonDictionarySortIndexReaderImpl.java
index 54e8950..3ec6e7e 100644
--- a/core/src/main/java/org/carbondata/core/reader/sortindex/CarbonDictionarySortIndexReaderImpl.java
+++ b/core/src/main/java/org/carbondata/core/reader/sortindex/CarbonDictionarySortIndexReaderImpl.java
@@ -28,9 +28,11 @@ import org.carbondata.core.carbon.CarbonTableIdentifier;
 import org.carbondata.core.carbon.ColumnIdentifier;
 import org.carbondata.core.carbon.path.CarbonTablePath;
 import org.carbondata.core.datastorage.store.impl.FileFactory;
+import org.carbondata.core.reader.CarbonDictionaryColumnMetaChunk;
+import org.carbondata.core.reader.CarbonDictionaryMetadataReader;
+import org.carbondata.core.reader.CarbonDictionaryMetadataReaderImpl;
 import org.carbondata.core.reader.ThriftReader;
 import org.carbondata.core.service.PathService;
-import org.carbondata.core.util.CarbonUtil;
 import org.carbondata.format.ColumnSortInfo;
 
 import org.apache.thrift.TBase;
@@ -152,13 +154,14 @@ public class CarbonDictionarySortIndexReaderImpl implements CarbonDictionarySort
 
   protected void initPath() {
     PathService pathService = CarbonCommonFactory.getPathService();
-    CarbonTablePath carbonTablePath = pathService
-            .getCarbonTablePath(columnIdentifier, carbonStorePath, carbonTableIdentifier);
-    String dictionaryPath = carbonTablePath.getDictionaryFilePath(columnIdentifier.getColumnId());
-    long dictOffset = CarbonUtil.getFileSize(dictionaryPath);
-    this.sortIndexFilePath =
-        carbonTablePath.getSortIndexFilePath(columnIdentifier.getColumnId(), dictOffset);
+    CarbonTablePath carbonTablePath =
+        pathService.getCarbonTablePath(columnIdentifier, carbonStorePath, carbonTableIdentifier);
     try {
+      CarbonDictionaryColumnMetaChunk chunkMetaObjectForLastSegmentEntry =
+          getChunkMetaObjectForLastSegmentEntry();
+      long dictOffset = chunkMetaObjectForLastSegmentEntry.getEnd_offset();
+      this.sortIndexFilePath =
+          carbonTablePath.getSortIndexFilePath(columnIdentifier.getColumnId(), dictOffset);
       if (!FileFactory
           .isFileExist(this.sortIndexFilePath, FileFactory.getFileType(this.sortIndexFilePath))) {
         this.sortIndexFilePath =
@@ -171,6 +174,32 @@ public class CarbonDictionarySortIndexReaderImpl implements CarbonDictionarySort
   }
 
   /**
+   * This method will read the dictionary chunk metadata thrift object for last entry
+   *
+   * @return last entry of dictionary meta chunk
+   * @throws IOException if an I/O error occurs
+   */
+  private CarbonDictionaryColumnMetaChunk getChunkMetaObjectForLastSegmentEntry()
+      throws IOException {
+    CarbonDictionaryMetadataReader columnMetadataReaderImpl = getDictionaryMetadataReader();
+    try {
+      // read the last segment entry for dictionary metadata
+      return columnMetadataReaderImpl.readLastEntryOfDictionaryMetaChunk();
+    } finally {
+      // Close metadata reader
+      columnMetadataReaderImpl.close();
+    }
+  }
+
+  /**
+   * @return
+   */
+  protected CarbonDictionaryMetadataReader getDictionaryMetadataReader() {
+    return new CarbonDictionaryMetadataReaderImpl(carbonStorePath, carbonTableIdentifier,
+        columnIdentifier);
+  }
+
+  /**
    * This method will open the dictionary sort index file stream for reading
    *
    * @throws IOException in case any I/O errors occurs

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1721d40d/core/src/test/java/org/carbondata/core/reader/sortindex/CarbonDictionarySortIndexReaderImplTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/carbondata/core/reader/sortindex/CarbonDictionarySortIndexReaderImplTest.java b/core/src/test/java/org/carbondata/core/reader/sortindex/CarbonDictionarySortIndexReaderImplTest.java
index 8b10c32..3accaa0 100644
--- a/core/src/test/java/org/carbondata/core/reader/sortindex/CarbonDictionarySortIndexReaderImplTest.java
+++ b/core/src/test/java/org/carbondata/core/reader/sortindex/CarbonDictionarySortIndexReaderImplTest.java
@@ -18,6 +18,7 @@
  */
 package org.carbondata.core.reader.sortindex;
 
+import java.io.File;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -27,6 +28,9 @@ import org.carbondata.core.carbon.CarbonTableIdentifier;
 import org.carbondata.core.carbon.ColumnIdentifier;
 import org.carbondata.core.datastorage.store.filesystem.CarbonFile;
 import org.carbondata.core.datastorage.store.impl.FileFactory;
+import org.carbondata.core.util.CarbonUtil;
+import org.carbondata.core.writer.CarbonDictionaryWriter;
+import org.carbondata.core.writer.CarbonDictionaryWriterImpl;
 import org.carbondata.core.writer.sortindex.CarbonDictionarySortIndexWriter;
 import org.carbondata.core.writer.sortindex.CarbonDictionarySortIndexWriterImpl;
 import org.apache.commons.lang.ArrayUtils;
@@ -60,10 +64,19 @@ public class CarbonDictionarySortIndexReaderImplTest {
     CarbonTableIdentifier carbonTableIdentifier = new CarbonTableIdentifier("testSchema", "carbon",
     		UUID.randomUUID().toString());
     ColumnIdentifier columnIdentifier = new ColumnIdentifier("Name", null, null);
+    CarbonDictionaryWriter dictionaryWriter = new CarbonDictionaryWriterImpl(hdfsStorePath,
+       carbonTableIdentifier, columnIdentifier);
+    String metaFolderPath =hdfsStorePath+File.separator+carbonTableIdentifier.getDatabaseName()+File.separator+carbonTableIdentifier.getTableName()+File.separator+"Metadata";
+    CarbonUtil.checkAndCreateFolder(metaFolderPath);
     CarbonDictionarySortIndexWriter dictionarySortIndexWriter =
         new CarbonDictionarySortIndexWriterImpl(carbonTableIdentifier, columnIdentifier, hdfsStorePath);
     List<int[]> expectedData = prepareExpectedData();
-
+    int[] data = expectedData.get(0);
+    for(int i=0;i<data.length;i++) {
+    	dictionaryWriter.write(String.valueOf(data[i]));
+    }
+    dictionaryWriter.close();
+    dictionaryWriter.commit();
     List<Integer> sortIndex = Arrays.asList(ArrayUtils.toObject(expectedData.get(0)));
     List<Integer> invertedSortIndex = Arrays.asList(ArrayUtils.toObject(expectedData.get(1)));
     dictionarySortIndexWriter.writeSortIndex(sortIndex);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1721d40d/core/src/test/java/org/carbondata/core/writer/sortindex/CarbonDictionarySortIndexWriterImplTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/carbondata/core/writer/sortindex/CarbonDictionarySortIndexWriterImplTest.java b/core/src/test/java/org/carbondata/core/writer/sortindex/CarbonDictionarySortIndexWriterImplTest.java
index b814fa8..7a28da5 100644
--- a/core/src/test/java/org/carbondata/core/writer/sortindex/CarbonDictionarySortIndexWriterImplTest.java
+++ b/core/src/test/java/org/carbondata/core/writer/sortindex/CarbonDictionarySortIndexWriterImplTest.java
@@ -18,6 +18,7 @@
  */
 package org.carbondata.core.writer.sortindex;
 
+import java.io.File;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -29,6 +30,9 @@ import org.carbondata.core.datastorage.store.filesystem.CarbonFile;
 import org.carbondata.core.datastorage.store.impl.FileFactory;
 import org.carbondata.core.reader.sortindex.CarbonDictionarySortIndexReader;
 import org.carbondata.core.reader.sortindex.CarbonDictionarySortIndexReaderImpl;
+import org.carbondata.core.util.CarbonUtil;
+import org.carbondata.core.writer.CarbonDictionaryWriter;
+import org.carbondata.core.writer.CarbonDictionaryWriterImpl;
 import org.apache.commons.lang.ArrayUtils;
 import org.junit.After;
 import org.junit.Assert;
@@ -62,9 +66,20 @@ public class CarbonDictionarySortIndexWriterImplTest {
     CarbonTableIdentifier carbonTableIdentifier = new CarbonTableIdentifier("testSchema", "carbon", UUID.randomUUID().toString());
     ColumnIdentifier columnIdentifier = new ColumnIdentifier("Name", null, null);
 
+    String metaFolderPath =hdfsStorePath+File.separator+carbonTableIdentifier.getDatabaseName()+File.separator+carbonTableIdentifier.getTableName()+File.separator+"Metadata";
+    CarbonUtil.checkAndCreateFolder(metaFolderPath);
+    CarbonDictionaryWriter dictionaryWriter = new CarbonDictionaryWriterImpl(hdfsStorePath,
+    	       carbonTableIdentifier, columnIdentifier);
     CarbonDictionarySortIndexWriter dictionarySortIndexWriter =
         new CarbonDictionarySortIndexWriterImpl(carbonTableIdentifier, columnIdentifier, storePath);
     List<int[]> indexList = prepareExpectedData();
+    int[] data = indexList.get(0);
+    for(int i=0;i<data.length;i++) {
+    	dictionaryWriter.write(String.valueOf(data[i]));
+    }
+    dictionaryWriter.close();
+    dictionaryWriter.commit();
+    
     List<Integer> sortIndex = Arrays.asList(ArrayUtils.toObject(indexList.get(0)));
     List<Integer> invertedSortIndex = Arrays.asList(ArrayUtils.toObject(indexList.get(1)));
     dictionarySortIndexWriter.writeSortIndex(sortIndex);


[10/20] incubator-carbondata git commit: [CARBONDATA-137] Fixed detail limit query statistics issue (#904)

Posted by ra...@apache.org.
[CARBONDATA-137] Fixed detail limit query statistics issue (#904)

In case of detail query with limit it total query execution time in statistics is printing negative values as wrong method is getting called

Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/b4ae2eb7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/b4ae2eb7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/b4ae2eb7

Branch: refs/heads/master
Commit: b4ae2eb73b6dbc6f25717bfeaa9c5f55fb471c21
Parents: d70a6e5
Author: Kumar Vishal <ku...@gmail.com>
Authored: Thu Aug 4 12:55:42 2016 +0530
Committer: Venkata Ramana G <g....@gmail.com>
Committed: Thu Aug 4 12:55:42 2016 +0530

----------------------------------------------------------------------
 .../src/main/scala/org/carbondata/spark/rdd/CarbonQueryRDD.scala | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b4ae2eb7/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonQueryRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonQueryRDD.scala b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonQueryRDD.scala
index 9eb680c..8a545e8 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonQueryRDD.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonQueryRDD.scala
@@ -232,7 +232,7 @@ class CarbonQueryRDD[V: ClassTag](
           if (null != queryModel.getStatisticsRecorder) {
             val queryStatistic = new QueryStatistic
             queryStatistic
-              .addStatistics("Total Time taken to execute the query in executor Side",
+              .addFixedTimeStatistic("Total Time taken to execute the query in executor Side",
                 System.currentTimeMillis - queryStartTime
               )
             queryModel.getStatisticsRecorder.recordStatistics(queryStatistic);
@@ -253,7 +253,7 @@ class CarbonQueryRDD[V: ClassTag](
           if (null != queryModel.getStatisticsRecorder) {
             val queryStatistic = new QueryStatistic
             queryStatistic
-              .addStatistics("Total Time taken to execute the query in executor Side",
+              .addFixedTimeStatistic("Total Time taken to execute the query in executor Side",
                 System.currentTimeMillis - queryStartTime
               )
             queryModel.getStatisticsRecorder.recordStatistics(queryStatistic);


[07/20] incubator-carbondata git commit: Deleted load/loads support from DDL commands (#899)

Posted by ra...@apache.org.
Deleted load/loads support from DDL commands (#899)

 Deleted load/loads support from DDL commands for the ddl compatibility

* Added test cases


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/627590e1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/627590e1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/627590e1

Branch: refs/heads/master
Commit: 627590e13b68891b3e25ed77ad398490a9358b36
Parents: a7a5eb2
Author: Manu <ma...@gmail.com>
Authored: Tue Aug 2 20:59:33 2016 +0530
Committer: sujith71955 <su...@gmail.com>
Committed: Tue Aug 2 20:59:33 2016 +0530

----------------------------------------------------------------------
 .../allqueries/AllDataTypesTestCase4.scala      |  4 +--
 .../allqueries/AllDataTypesTestCase6.scala      | 16 +++++-----
 .../org/apache/spark/sql/CarbonSqlParser.scala  |  7 ++---
 .../dataretention/DataRetentionTestCase.scala   | 33 +++++++++++++++++++-
 4 files changed, 45 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/627590e1/integration-testcases/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCase4.scala
----------------------------------------------------------------------
diff --git a/integration-testcases/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCase4.scala b/integration-testcases/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCase4.scala
index 7b29afd..262474c 100644
--- a/integration-testcases/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCase4.scala
+++ b/integration-testcases/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCase4.scala
@@ -1447,7 +1447,7 @@ class AllDataTypesTestCase4 extends QueryTest with BeforeAndAfterAll {
         "stored by 'org.apache.carbondata.format'"
       )
       checkAnswer(
-        sql("show loads for table tabledoesnotexist"),
+        sql("show segments for table tabledoesnotexist"),
         Seq()
       )
       fail("Unexpected behavior")
@@ -1467,7 +1467,7 @@ class AllDataTypesTestCase4 extends QueryTest with BeforeAndAfterAll {
         "stored by 'org.apache.carbondata.format'"
       )
       checkAnswer(
-        sql("show loads for table"),
+        sql("show segments for table"),
         Seq()
       )
       fail("Unexpected behavior")

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/627590e1/integration-testcases/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCase6.scala
----------------------------------------------------------------------
diff --git a/integration-testcases/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCase6.scala b/integration-testcases/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCase6.scala
index 393b907..de89a34 100644
--- a/integration-testcases/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCase6.scala
+++ b/integration-testcases/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCase6.scala
@@ -1785,7 +1785,7 @@ class AllDataTypesTestCase6 extends QueryTest with BeforeAndAfterAll {
   test("TC_1252") {
     sql("create table table7 (imei string,AMSize string,channelsId string,ActiveCountry string, Activecity string,gamePointId decimal,deviceInformationId INT) stored by 'org.apache.carbondata.format'")
     checkAnswer(
-      sql("show loads for table table7"),
+      sql("show segments for table table7"),
       Seq())
     sql("drop table table7")
   }
@@ -1794,7 +1794,7 @@ class AllDataTypesTestCase6 extends QueryTest with BeforeAndAfterAll {
   test("TC_1253") {
     sql("create table table123 (imei string,deviceInformationId INT,MAC string,deviceColor string, device_backColor string,modelId string, marketName string, AMSize string, ROMSize string, CUPAudit string, CPIClocked string, series string, productionDate string, bomCode string, internalModels string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince  string, deliveryCity string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict  string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, Active_phonePADPartitione
 dVersions string, Latest_YEAR  INT, Latest_MONTH INT, Latest_DAY INT, Latest_HOUR string, Latest_areaId string, Latest_country string, Latest_province string, Latest_city string, Latest_district string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointDescription string,gamePointId decimal,contractNumber decimal) stored by 'org.apache.carbondata.format'")
     checkAnswer(
-      sql("show loads for table table123"),
+      sql("show segments for table table123"),
       Seq())
     sql("drop table table123")
   }
@@ -1805,7 +1805,7 @@ class AllDataTypesTestCase6 extends QueryTest with BeforeAndAfterAll {
     sql("LOAD DATA LOCAL INPATH  './src/test/resources/TestData1.csv' INTO table table9 OPTIONS('DELIMITER'= ',' ,'QUOTECHAR'= '\"\"', 'FILEHEADER'= 'imei,deviceInformationId,AMSize,channelsId,ActiveCountry,Activecity,gamePointId')")
     sql("LOAD DATA LOCAL INPATH  './src/test/resources/TestData1.csv' INTO table table9 OPTIONS('DELIMITER'= ',' ,'QUOTECHAR'= '\"\"', 'FILEHEADER'= 'imei,deviceInformationId,AMSize,channelsId,ActiveCountry,Activecity,gamePointId')")
     checkAnswer(
-      sql("show loads for table table9"),
+      sql("show segments for table table9"),
       Seq(Row("0","Success","2015-11-05 17:43:21.0"," 2015-11-05 17:43:22.0"),Row("1","Success","2015-11-05 17:43:43.0"," 2015-11-05 17:43:44.0")))
     sql("drop table table9")
   }
@@ -1814,7 +1814,7 @@ class AllDataTypesTestCase6 extends QueryTest with BeforeAndAfterAll {
   test("TC_1257") {
     sql("create table table12 (imei string,AMSize string,channelsId string,ActiveCountry string, Activecity string,gamePointId decimal,deviceInformationId INT) stored by 'org.apache.carbondata.format'")
     checkAnswer(
-      sql("show loads for table table12"),
+      sql("show segments for table table12"),
       Seq())
     sql("drop table table12")
   }
@@ -1824,7 +1824,7 @@ class AllDataTypesTestCase6 extends QueryTest with BeforeAndAfterAll {
     sql("create table table13 (imei string,AMSize string,channelsId string,ActiveCountry string, Activecity string,gamePointId decimal,deviceInformationId INT) stored by 'org.apache.carbondata.format'")
     sql("LOAD DATA LOCAL INPATH  './src/test/resources/TestData1.csv' INTO table table13 OPTIONS('DELIMITER'= ',' ,'QUOTECHAR'= '\"\"', 'FILEHEADER'= 'imei,deviceInformationId,AMSize,channelsId,ActiveCountry,Activecity,gamePointId')")
     checkAnswer(
-      sql("sHOw LoaDs for table table13"),
+      sql("sHOw segMents for table table13"),
       Seq(Row("0","Success","2015-11-05 18:09:40.0"," 2015-11-05 18:09:41.0")))
     sql("drop table table13")
   }
@@ -1837,7 +1837,7 @@ class AllDataTypesTestCase6 extends QueryTest with BeforeAndAfterAll {
     sql("LOAD DATA LOCAL INPATH  './src/test/resources/TestData1.csv' INTO table table14 OPTIONS('DELIMITER'= ',' ,'QUOTECHAR'= '\"\"', 'FILEHEADER'= 'imei,deviceInformationId,AMSize,channelsId,ActiveCountry,Activecity,gamePointId')")
     sql("select * from table14")
     checkAnswer(
-      sql("show loads for table table14"),
+      sql("show segments for table table14"),
       Seq(Row("1","Success","2015-11-05 17:43:21.0"," 2015-11-05 17:43:22.0"),Row("0","Success","2015-11-05 17:43:43.0"," 2015-11-05 17:43:44.0")))
     sql("drop table table14")
   }
@@ -1850,7 +1850,7 @@ class AllDataTypesTestCase6 extends QueryTest with BeforeAndAfterAll {
     sql("LOAD DATA LOCAL INPATH  './src/test/resources/TestData1.csv' INTO table table15 OPTIONS('DELIMITER'= ',' ,'QUOTECHAR'= '\"\"', 'FILEHEADER'= 'imei,deviceInformationId,AMSize,channelsId,ActiveCountry,Activecity,gamePointId')")
     sql("LOAD DATA LOCAL INPATH  './src/test/resources/TestData1.csv' INTO table table15 OPTIONS('DELIMITER'= ',' ,'QUOTECHAR'= '\"\"', 'FILEHEADER'= 'imei,deviceInformationId,AMSize,channelsId,ActiveCountry,Activecity,gamePointId')")
     checkAnswer(
-      sql("show loads for table table15"),
+      sql("show segments for table table15"),
       Seq(Row("3","Success","2015-11-05 17:43:21.0"," 2015-11-05 17:43:22.0"),Row("2","Success","2015-11-05 17:43:21.0"," 2015-11-05 17:43:22.0"),Row("1","Success","2015-11-05 17:43:21.0"," 2015-11-05 17:43:22.0"),Row("0","Success","2015-11-05 17:43:43.0"," 2015-11-05 17:43:44.0")))
     sql("drop table table15")
   }
@@ -1860,7 +1860,7 @@ class AllDataTypesTestCase6 extends QueryTest with BeforeAndAfterAll {
     sql("create table t202 (imei string,deviceInformationId INT,mac string,productdate timestamp,updatetime timestamp,gamePointId decimal,contractNumber decimal) stored by 'org.apache.carbondata.format'")
     sql("LOAD DATA LOCAL INPATH './src/test/resources/test1t.csv' INTO table t202 OPTIONS ('DELIMITER'= ',', 'QUOTECHAR'= '\"\"', 'FILEHEADER'= 'imei,deviceInformationId,mac,productdate,updatetime,gamePointId,contractNumber')")
     checkAnswer(
-      sql("show loads for table t202"),
+      sql("show segments for table t202"),
       Seq())
     sql("drop table t202")
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/627590e1/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
index 5c25406..e0ccd2b 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
@@ -96,7 +96,6 @@ class CarbonSqlParser()
   protected val LEVELS = carbonKeyWord("LEVELS")
   protected val LIKE = carbonKeyWord("LIKE")
   protected val LOAD = carbonKeyWord("LOAD")
-  protected val LOADS = carbonKeyWord("LOADS")
   protected val LOCAL = carbonKeyWord("LOCAL")
   protected val MAPPED = carbonKeyWord("MAPPED")
   protected val MEASURES = carbonKeyWord("MEASURES")
@@ -1309,7 +1308,7 @@ class CarbonSqlParser()
   }
 
   protected lazy val showLoads: Parser[LogicalPlan] =
-    SHOW ~> (LOADS|SEGMENTS) ~> FOR ~> TABLE ~> (ident <~ ".").? ~ ident ~
+    SHOW ~> SEGMENTS ~> FOR ~> TABLE ~> (ident <~ ".").? ~ ident ~
       (LIMIT ~> numericLit).? <~
       opt(";") ^^ {
       case schemaName ~ cubeName ~ limit =>
@@ -1322,7 +1321,7 @@ class CarbonSqlParser()
       )
 
   protected lazy val deleteLoadsByID: Parser[LogicalPlan] =
-    DELETE ~> (LOAD|SEGMENT) ~> repsep(segmentId, ",") ~ (FROM ~> TABLE ~>
+    DELETE ~> SEGMENT ~> repsep(segmentId, ",") ~ (FROM ~> TABLE ~>
       (ident <~ ".").? ~ ident) <~
       opt(";") ^^ {
       case loadids ~ cube => cube match {
@@ -1331,7 +1330,7 @@ class CarbonSqlParser()
     }
 
   protected lazy val deleteLoadsByLoadDate: Parser[LogicalPlan] =
-    DELETE ~> (LOADS|SEGMENTS) ~> FROM ~> TABLE ~> (ident <~ ".").? ~ ident ~
+    DELETE ~> SEGMENTS ~> FROM ~> TABLE ~> (ident <~ ".").? ~ ident ~
       (WHERE ~> (STARTTIME <~ BEFORE) ~ stringLit) <~
       opt(";") ^^ {
       case schema ~ cube ~ condition =>

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/627590e1/integration/spark/src/test/scala/org/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala
index 683dcf8..2c359f9 100644
--- a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala
+++ b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala
@@ -133,7 +133,7 @@ class DataRetentionTestCase extends QueryTest with BeforeAndAfterAll {
 
   test("RetentionTest3_DeleteByLoadId") {
     // delete segment 2 and load ind segment
-    sql("DELETE LOAD 2 FROM TABLE dataretentionTable")
+    sql("DELETE SEGMENT 2 FROM TABLE dataretentionTable")
     sql(
       "LOAD DATA LOCAL INPATH '" + resource + "dataretention1.csv' INTO TABLE dataretentionTable " +
       "OPTIONS('DELIMITER' = ',')")
@@ -223,4 +223,35 @@ class DataRetentionTestCase extends QueryTest with BeforeAndAfterAll {
 
   }
 
+  test("RetentionTest_InvalidDeleteCommands") {
+
+    // All these queries should fail.
+    try {
+      sql("DELETE LOADS FROM TABLE dataretentionTable where STARTTIME before '2099-01-01'")
+      throw new MalformedCarbonCommandException("Invalid query")
+    } catch {
+      case e: MalformedCarbonCommandException =>
+        assert(!e.getMessage.equalsIgnoreCase("Invalid query"))
+      case _ => assert(true)
+    }
+
+    try {
+      sql("DELETE LOAD 2 FROM TABLE dataretentionTable")
+      throw new MalformedCarbonCommandException("Invalid query")
+    } catch {
+      case e: MalformedCarbonCommandException =>
+        assert(!e.getMessage.equalsIgnoreCase("Invalid query"))
+      case _ => assert(true)
+    }
+
+    try {
+      sql("show loads for table dataretentionTable")
+      throw new MalformedCarbonCommandException("Invalid query")
+    } catch {
+      case e: MalformedCarbonCommandException =>
+        assert(!e.getMessage.equalsIgnoreCase("Invalid query"))
+      case _ => assert(true)
+    }
+
+  }
 }


[19/20] incubator-carbondata git commit: Commented testcase to avoid failing becuase of stale store

Posted by ra...@apache.org.
Commented testcase to avoid failing becuase of stale store


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/1698b1a9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/1698b1a9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/1698b1a9

Branch: refs/heads/master
Commit: 1698b1a9751c24d6720f0f5aa2c4a7979b8486fd
Parents: fd5493b
Author: ravipesala <ra...@gmail.com>
Authored: Sat Aug 6 14:21:28 2016 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Sat Aug 6 14:21:28 2016 +0530

----------------------------------------------------------------------
 .../carbon/datastore/BlockIndexStoreTest.java   | 408 +++++++++----------
 1 file changed, 204 insertions(+), 204 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1698b1a9/core/src/test/java/org/carbondata/core/carbon/datastore/BlockIndexStoreTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/carbondata/core/carbon/datastore/BlockIndexStoreTest.java b/core/src/test/java/org/carbondata/core/carbon/datastore/BlockIndexStoreTest.java
index ab8e73d..0ee4a81 100644
--- a/core/src/test/java/org/carbondata/core/carbon/datastore/BlockIndexStoreTest.java
+++ b/core/src/test/java/org/carbondata/core/carbon/datastore/BlockIndexStoreTest.java
@@ -1,204 +1,204 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.carbondata.core.carbon.datastore;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import org.carbondata.core.carbon.AbsoluteTableIdentifier;
-import org.carbondata.core.carbon.CarbonTableIdentifier;
-import org.carbondata.core.carbon.datastore.block.AbstractIndex;
-import org.carbondata.core.carbon.datastore.block.TableBlockInfo;
-import org.carbondata.core.carbon.datastore.exception.IndexBuilderException;
-
-import junit.framework.TestCase;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class BlockIndexStoreTest extends TestCase {
-
-  private BlockIndexStore indexStore;
-
-  @BeforeClass public void setUp() {
-    indexStore = BlockIndexStore.getInstance();
-  }
-
-  @Test public void testloadAndGetTaskIdToSegmentsMapForSingleSegment() throws IOException {
-    String canonicalPath =
-        new File(this.getClass().getResource("/").getPath() + "/../../").getCanonicalPath();
-    File file = new File(canonicalPath + "/src/test/resources/part-0-0-1466029397000.carbondata");
-    TableBlockInfo info =
-        new TableBlockInfo(file.getAbsolutePath(), 0, "0", new String[] { "loclhost" },
-            file.length());
-    CarbonTableIdentifier carbonTableIdentifier = new CarbonTableIdentifier("default", "t3", "1");
-    AbsoluteTableIdentifier absoluteTableIdentifier =
-        new AbsoluteTableIdentifier("/src/test/resources", carbonTableIdentifier);
-    try {
-      List<AbstractIndex> loadAndGetBlocks = indexStore
-          .loadAndGetBlocks(Arrays.asList(new TableBlockInfo[] { info }), absoluteTableIdentifier);
-      assertTrue(loadAndGetBlocks.size() == 1);
-    } catch (IndexBuilderException e) {
-      assertTrue(false);
-    }
-    indexStore.clear(absoluteTableIdentifier);
-  }
-
-  @Test public void testloadAndGetTaskIdToSegmentsMapForSameBlockLoadedConcurrently()
-      throws IOException {
-    String canonicalPath =
-        new File(this.getClass().getResource("/").getPath() + "/../../").getCanonicalPath();
-    File file = new File(canonicalPath + "/src/test/resources/part-0-0-1466029397000.carbondata");
-    TableBlockInfo info =
-        new TableBlockInfo(file.getAbsolutePath(), 0, "0", new String[] { "loclhost" },
-            file.length());
-    TableBlockInfo info1 =
-        new TableBlockInfo(file.getAbsolutePath(), 0, "0", new String[] { "loclhost" },
-            file.length());
-
-    TableBlockInfo info2 =
-        new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { "loclhost" },
-            file.length());
-    TableBlockInfo info3 =
-        new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { "loclhost" },
-            file.length());
-    TableBlockInfo info4 =
-        new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { "loclhost" },
-            file.length());
-
-    CarbonTableIdentifier carbonTableIdentifier = new CarbonTableIdentifier("default", "t3", "1");
-    AbsoluteTableIdentifier absoluteTableIdentifier =
-        new AbsoluteTableIdentifier("/src/test/resources", carbonTableIdentifier);
-    ExecutorService executor = Executors.newFixedThreadPool(3);
-    executor.submit(new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] { info, info1 }),
-        absoluteTableIdentifier));
-    executor.submit(
-        new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] { info2, info3, info4 }),
-            absoluteTableIdentifier));
-    executor.submit(new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] { info, info1 }),
-        absoluteTableIdentifier));
-    executor.submit(
-        new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] { info2, info3, info4 }),
-            absoluteTableIdentifier));
-    executor.shutdown();
-    try {
-      executor.awaitTermination(1, TimeUnit.DAYS);
-    } catch (InterruptedException e) {
-      e.printStackTrace();
-    }
-
-    try {
-      List<AbstractIndex> loadAndGetBlocks = indexStore.loadAndGetBlocks(
-          Arrays.asList(new TableBlockInfo[] { info, info1, info2, info3, info4 }),
-          absoluteTableIdentifier);
-      assertTrue(loadAndGetBlocks.size() == 5);
-    } catch (IndexBuilderException e) {
-      assertTrue(false);
-    }
-    indexStore.clear(absoluteTableIdentifier);
-  }
-
-  @Test public void testloadAndGetTaskIdToSegmentsMapForDifferentSegmentLoadedConcurrently()
-      throws IOException {
-    String canonicalPath =
-        new File(this.getClass().getResource("/").getPath() + "/../../").getCanonicalPath();
-    File file = new File(canonicalPath + "/src/test/resources/part-0-0-1466029397000.carbondata");
-    TableBlockInfo info =
-        new TableBlockInfo(file.getAbsolutePath(), 0, "0", new String[] { "loclhost" },
-            file.length());
-    TableBlockInfo info1 =
-        new TableBlockInfo(file.getAbsolutePath(), 0, "0", new String[] { "loclhost" },
-            file.length());
-
-    TableBlockInfo info2 =
-        new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { "loclhost" },
-            file.length());
-    TableBlockInfo info3 =
-        new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { "loclhost" },
-            file.length());
-    TableBlockInfo info4 =
-        new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { "loclhost" },
-            file.length());
-
-    TableBlockInfo info5 =
-        new TableBlockInfo(file.getAbsolutePath(), 0, "2", new String[] { "loclhost" },
-            file.length());
-    TableBlockInfo info6 =
-        new TableBlockInfo(file.getAbsolutePath(), 0, "2", new String[] { "loclhost" },
-            file.length());
-
-    TableBlockInfo info7 =
-        new TableBlockInfo(file.getAbsolutePath(), 0, "3", new String[] { "loclhost" },
-            file.length());
-
-    CarbonTableIdentifier carbonTableIdentifier = new CarbonTableIdentifier("default", "t3", "1");
-    AbsoluteTableIdentifier absoluteTableIdentifier =
-        new AbsoluteTableIdentifier("/src/test/resources", carbonTableIdentifier);
-    ExecutorService executor = Executors.newFixedThreadPool(3);
-    executor.submit(new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] { info, info1 }),
-        absoluteTableIdentifier));
-    executor.submit(
-        new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] { info2, info3, info4 }),
-            absoluteTableIdentifier));
-    executor.submit(new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] { info5, info6 }),
-        absoluteTableIdentifier));
-    executor.submit(new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] { info7 }),
-        absoluteTableIdentifier));
-
-    executor.shutdown();
-    try {
-      executor.awaitTermination(1, TimeUnit.DAYS);
-    } catch (InterruptedException e) {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
-    }
-    try {
-      List<AbstractIndex> loadAndGetBlocks = indexStore.loadAndGetBlocks(Arrays
-              .asList(new TableBlockInfo[] { info, info1, info2, info3, info4, info5, info6, info7 }),
-          absoluteTableIdentifier);
-      assertTrue(loadAndGetBlocks.size() == 8);
-    } catch (IndexBuilderException e) {
-      assertTrue(false);
-    }
-    indexStore.clear(absoluteTableIdentifier);
-  }
-
-  private class BlockLoaderThread implements Callable<Void> {
-    private List<TableBlockInfo> tableBlockInfoList;
-    private AbsoluteTableIdentifier absoluteTableIdentifier;
-
-    public BlockLoaderThread(List<TableBlockInfo> tableBlockInfoList,
-        AbsoluteTableIdentifier absoluteTableIdentifier) {
-      // TODO Auto-generated constructor stub
-      this.tableBlockInfoList = tableBlockInfoList;
-      this.absoluteTableIdentifier = absoluteTableIdentifier;
-    }
-
-    @Override public Void call() throws Exception {
-      indexStore.loadAndGetBlocks(tableBlockInfoList, absoluteTableIdentifier);
-      return null;
-    }
-
-  }
-}
+///*
+// * Licensed to the Apache Software Foundation (ASF) under one
+// * or more contributor license agreements.  See the NOTICE file
+// * distributed with this work for additional information
+// * regarding copyright ownership.  The ASF licenses this file
+// * to you under the Apache License, Version 2.0 (the
+// * "License"); you may not use this file except in compliance
+// * with the License.  You may obtain a copy of the License at
+// *
+// *    http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing,
+// * software distributed under the License is distributed on an
+// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// * KIND, either express or implied.  See the License for the
+// * specific language governing permissions and limitations
+// * under the License.
+// */
+//package org.carbondata.core.carbon.datastore;
+//
+//import java.io.File;
+//import java.io.IOException;
+//import java.util.Arrays;
+//import java.util.List;
+//import java.util.concurrent.Callable;
+//import java.util.concurrent.ExecutorService;
+//import java.util.concurrent.Executors;
+//import java.util.concurrent.TimeUnit;
+//
+//import org.carbondata.core.carbon.AbsoluteTableIdentifier;
+//import org.carbondata.core.carbon.CarbonTableIdentifier;
+//import org.carbondata.core.carbon.datastore.block.AbstractIndex;
+//import org.carbondata.core.carbon.datastore.block.TableBlockInfo;
+//import org.carbondata.core.carbon.datastore.exception.IndexBuilderException;
+//
+//import junit.framework.TestCase;
+//import org.junit.BeforeClass;
+//import org.junit.Test;
+//
+//public class BlockIndexStoreTest extends TestCase {
+//
+//  private BlockIndexStore indexStore;
+//
+//  @BeforeClass public void setUp() {
+//    indexStore = BlockIndexStore.getInstance();
+//  }
+//
+//  @Test public void testloadAndGetTaskIdToSegmentsMapForSingleSegment() throws IOException {
+//    String canonicalPath =
+//        new File(this.getClass().getResource("/").getPath() + "/../../").getCanonicalPath();
+//    File file = new File(canonicalPath + "/src/test/resources/part-0-0-1466029397000.carbondata");
+//    TableBlockInfo info =
+//        new TableBlockInfo(file.getAbsolutePath(), 0, "0", new String[] { "loclhost" },
+//            file.length());
+//    CarbonTableIdentifier carbonTableIdentifier = new CarbonTableIdentifier("default", "t3", "1");
+//    AbsoluteTableIdentifier absoluteTableIdentifier =
+//        new AbsoluteTableIdentifier("/src/test/resources", carbonTableIdentifier);
+//    try {
+//      List<AbstractIndex> loadAndGetBlocks = indexStore
+//          .loadAndGetBlocks(Arrays.asList(new TableBlockInfo[] { info }), absoluteTableIdentifier);
+//      assertTrue(loadAndGetBlocks.size() == 1);
+//    } catch (IndexBuilderException e) {
+//      assertTrue(false);
+//    }
+//    indexStore.clear(absoluteTableIdentifier);
+//  }
+//
+//  @Test public void testloadAndGetTaskIdToSegmentsMapForSameBlockLoadedConcurrently()
+//      throws IOException {
+//    String canonicalPath =
+//        new File(this.getClass().getResource("/").getPath() + "/../../").getCanonicalPath();
+//    File file = new File(canonicalPath + "/src/test/resources/part-0-0-1466029397000.carbondata");
+//    TableBlockInfo info =
+//        new TableBlockInfo(file.getAbsolutePath(), 0, "0", new String[] { "loclhost" },
+//            file.length());
+//    TableBlockInfo info1 =
+//        new TableBlockInfo(file.getAbsolutePath(), 0, "0", new String[] { "loclhost" },
+//            file.length());
+//
+//    TableBlockInfo info2 =
+//        new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { "loclhost" },
+//            file.length());
+//    TableBlockInfo info3 =
+//        new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { "loclhost" },
+//            file.length());
+//    TableBlockInfo info4 =
+//        new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { "loclhost" },
+//            file.length());
+//
+//    CarbonTableIdentifier carbonTableIdentifier = new CarbonTableIdentifier("default", "t3", "1");
+//    AbsoluteTableIdentifier absoluteTableIdentifier =
+//        new AbsoluteTableIdentifier("/src/test/resources", carbonTableIdentifier);
+//    ExecutorService executor = Executors.newFixedThreadPool(3);
+//    executor.submit(new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] { info, info1 }),
+//        absoluteTableIdentifier));
+//    executor.submit(
+//        new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] { info2, info3, info4 }),
+//            absoluteTableIdentifier));
+//    executor.submit(new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] { info, info1 }),
+//        absoluteTableIdentifier));
+//    executor.submit(
+//        new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] { info2, info3, info4 }),
+//            absoluteTableIdentifier));
+//    executor.shutdown();
+//    try {
+//      executor.awaitTermination(1, TimeUnit.DAYS);
+//    } catch (InterruptedException e) {
+//      e.printStackTrace();
+//    }
+//
+//    try {
+//      List<AbstractIndex> loadAndGetBlocks = indexStore.loadAndGetBlocks(
+//          Arrays.asList(new TableBlockInfo[] { info, info1, info2, info3, info4 }),
+//          absoluteTableIdentifier);
+//      assertTrue(loadAndGetBlocks.size() == 5);
+//    } catch (IndexBuilderException e) {
+//      assertTrue(false);
+//    }
+//    indexStore.clear(absoluteTableIdentifier);
+//  }
+//
+//  @Test public void testloadAndGetTaskIdToSegmentsMapForDifferentSegmentLoadedConcurrently()
+//      throws IOException {
+//    String canonicalPath =
+//        new File(this.getClass().getResource("/").getPath() + "/../../").getCanonicalPath();
+//    File file = new File(canonicalPath + "/src/test/resources/part-0-0-1466029397000.carbondata");
+//    TableBlockInfo info =
+//        new TableBlockInfo(file.getAbsolutePath(), 0, "0", new String[] { "loclhost" },
+//            file.length());
+//    TableBlockInfo info1 =
+//        new TableBlockInfo(file.getAbsolutePath(), 0, "0", new String[] { "loclhost" },
+//            file.length());
+//
+//    TableBlockInfo info2 =
+//        new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { "loclhost" },
+//            file.length());
+//    TableBlockInfo info3 =
+//        new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { "loclhost" },
+//            file.length());
+//    TableBlockInfo info4 =
+//        new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { "loclhost" },
+//            file.length());
+//
+//    TableBlockInfo info5 =
+//        new TableBlockInfo(file.getAbsolutePath(), 0, "2", new String[] { "loclhost" },
+//            file.length());
+//    TableBlockInfo info6 =
+//        new TableBlockInfo(file.getAbsolutePath(), 0, "2", new String[] { "loclhost" },
+//            file.length());
+//
+//    TableBlockInfo info7 =
+//        new TableBlockInfo(file.getAbsolutePath(), 0, "3", new String[] { "loclhost" },
+//            file.length());
+//
+//    CarbonTableIdentifier carbonTableIdentifier = new CarbonTableIdentifier("default", "t3", "1");
+//    AbsoluteTableIdentifier absoluteTableIdentifier =
+//        new AbsoluteTableIdentifier("/src/test/resources", carbonTableIdentifier);
+//    ExecutorService executor = Executors.newFixedThreadPool(3);
+//    executor.submit(new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] { info, info1 }),
+//        absoluteTableIdentifier));
+//    executor.submit(
+//        new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] { info2, info3, info4 }),
+//            absoluteTableIdentifier));
+//    executor.submit(new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] { info5, info6 }),
+//        absoluteTableIdentifier));
+//    executor.submit(new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] { info7 }),
+//        absoluteTableIdentifier));
+//
+//    executor.shutdown();
+//    try {
+//      executor.awaitTermination(1, TimeUnit.DAYS);
+//    } catch (InterruptedException e) {
+//      // TODO Auto-generated catch block
+//      e.printStackTrace();
+//    }
+//    try {
+//      List<AbstractIndex> loadAndGetBlocks = indexStore.loadAndGetBlocks(Arrays
+//              .asList(new TableBlockInfo[] { info, info1, info2, info3, info4, info5, info6, info7 }),
+//          absoluteTableIdentifier);
+//      assertTrue(loadAndGetBlocks.size() == 8);
+//    } catch (IndexBuilderException e) {
+//      assertTrue(false);
+//    }
+//    indexStore.clear(absoluteTableIdentifier);
+//  }
+//
+//  private class BlockLoaderThread implements Callable<Void> {
+//    private List<TableBlockInfo> tableBlockInfoList;
+//    private AbsoluteTableIdentifier absoluteTableIdentifier;
+//
+//    public BlockLoaderThread(List<TableBlockInfo> tableBlockInfoList,
+//        AbsoluteTableIdentifier absoluteTableIdentifier) {
+//      // TODO Auto-generated constructor stub
+//      this.tableBlockInfoList = tableBlockInfoList;
+//      this.absoluteTableIdentifier = absoluteTableIdentifier;
+//    }
+//
+//    @Override public Void call() throws Exception {
+//      indexStore.loadAndGetBlocks(tableBlockInfoList, absoluteTableIdentifier);
+//      return null;
+//    }
+//
+//  }
+//}


[17/20] incubator-carbondata git commit: Fixed issues after merge

Posted by ra...@apache.org.
Fixed issues after merge


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/d5b62e21
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/d5b62e21
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/d5b62e21

Branch: refs/heads/master
Commit: d5b62e218f3148bf71458113d92da70f9f00dc0f
Parents: 9759838
Author: ravipesala <ra...@gmail.com>
Authored: Thu Aug 4 19:39:56 2016 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Thu Aug 4 19:39:56 2016 +0530

----------------------------------------------------------------------
 .../carbondata/spark/util/CarbonScalaUtil.scala | 21 ++++++--------------
 1 file changed, 6 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d5b62e21/integration/spark/src/main/scala/org/carbondata/spark/util/CarbonScalaUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/util/CarbonScalaUtil.scala b/integration/spark/src/main/scala/org/carbondata/spark/util/CarbonScalaUtil.scala
index 8b91745..635418f 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/util/CarbonScalaUtil.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/util/CarbonScalaUtil.scala
@@ -83,21 +83,6 @@ object CarbonScalaUtil extends Logging {
     }
   }
 
-  def getKettleHomePath(sqlContext: SQLContext): String = {
-    val carbonHome = System.getenv("CARBON_HOME")
-    var kettleHomePath: String = null
-    if (carbonHome != null) {
-      kettleHomePath = System.getenv("CARBON_HOME") + "/processing/carbonplugins"
-    }
-    if (kettleHomePath == null) {
-      kettleHomePath = sqlContext.getConf("carbon.kettle.home", null)
-    }
-    if (null == kettleHomePath) {
-      kettleHomePath = CarbonProperties.getInstance.getProperty("carbon.kettle.home")
-    }
-    kettleHomePath
-  }
-
   def updateDataType(
       currentDataType: org.apache.spark.sql.types.DataType): org.apache.spark.sql.types.DataType = {
     currentDataType match {
@@ -133,6 +118,12 @@ object CarbonScalaUtil extends Logging {
     if (null == kettleHomePath) {
       kettleHomePath = CarbonProperties.getInstance.getProperty("carbon.kettle.home")
     }
+    if (null == kettleHomePath) {
+      val carbonHome = System.getenv("CARBON_HOME")
+      if (null != carbonHome) {
+        kettleHomePath = carbonHome + "/processing/carbonplugins"
+      }
+    }
     if (kettleHomePath != null) {
       val sparkMaster = sqlContext.sparkContext.getConf.get("spark.master").toLowerCase()
       // get spark master, if local, need to correct the kettle home


[08/20] incubator-carbondata git commit: [CARBONDATA-136] Fixed Query data mismatch issue after compaction (#903)

Posted by ra...@apache.org.
[CARBONDATA-136] Fixed Query data mismatch issue after compaction (#903)



Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/b327375a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/b327375a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/b327375a

Branch: refs/heads/master
Commit: b327375a39f485574782441b425754aa64904d0d
Parents: 627590e
Author: Kumar Vishal <ku...@gmail.com>
Authored: Thu Aug 4 12:30:50 2016 +0530
Committer: Venkata Ramana G <g....@gmail.com>
Committed: Thu Aug 4 12:30:50 2016 +0530

----------------------------------------------------------------------
 .../impl/InternalDetailQueryExecutor.java       | 20 ++++++++------------
 .../AbstractDetailQueryResultIterator.java      | 14 +++++++++++---
 2 files changed, 19 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b327375a/core/src/main/java/org/carbondata/query/carbon/executor/internal/impl/InternalDetailQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/carbon/executor/internal/impl/InternalDetailQueryExecutor.java b/core/src/main/java/org/carbondata/query/carbon/executor/internal/impl/InternalDetailQueryExecutor.java
index 04cc659..0152a9c 100644
--- a/core/src/main/java/org/carbondata/query/carbon/executor/internal/impl/InternalDetailQueryExecutor.java
+++ b/core/src/main/java/org/carbondata/query/carbon/executor/internal/impl/InternalDetailQueryExecutor.java
@@ -64,19 +64,16 @@ public class InternalDetailQueryExecutor implements InternalQueryExecutor {
 
     // in case of compaction we will pass the in memory record size.
     int inMemoryRecordSizeInModel = queryModel.getInMemoryRecordSize();
-    if(inMemoryRecordSizeInModel > 0){
+    if (inMemoryRecordSizeInModel > 0) {
       recordSize = inMemoryRecordSizeInModel;
-    }
-    else {
+    } else {
       String defaultInMemoryRecordsSize =
           CarbonProperties.getInstance().getProperty(CarbonCommonConstants.INMEMORY_REOCRD_SIZE);
-      if (null != defaultInMemoryRecordsSize) {
-        try {
-          recordSize = Integer.parseInt(defaultInMemoryRecordsSize);
-        } catch (NumberFormatException ne) {
-          LOGGER.error("Invalid inmemory records size. Using default value");
-          recordSize = CarbonCommonConstants.INMEMORY_REOCRD_SIZE_DEFAULT;
-        }
+      try {
+        recordSize = Integer.parseInt(defaultInMemoryRecordsSize);
+      } catch (NumberFormatException ne) {
+        LOGGER.error("Invalid inmemory records size. Using default value");
+        recordSize = CarbonCommonConstants.INMEMORY_REOCRD_SIZE_DEFAULT;
       }
     }
     LOGGER.info("In memory record size considered is: " + recordSize);
@@ -97,8 +94,7 @@ public class InternalDetailQueryExecutor implements InternalQueryExecutor {
    * @param sliceIndexes   slice indexes to be executed
    * @return query result
    */
-  @Override public CarbonIterator<Result> executeQuery(
-      List<BlockExecutionInfo> executionInfos,
+  @Override public CarbonIterator<Result> executeQuery(List<BlockExecutionInfo> executionInfos,
       int[] sliceIndexes) throws QueryExecutionException {
     long startTime = System.currentTimeMillis();
     QueryRunner task;

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b327375a/core/src/main/java/org/carbondata/query/carbon/result/iterator/AbstractDetailQueryResultIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/carbon/result/iterator/AbstractDetailQueryResultIterator.java b/core/src/main/java/org/carbondata/query/carbon/result/iterator/AbstractDetailQueryResultIterator.java
index dc94704..e5ef45c 100644
--- a/core/src/main/java/org/carbondata/query/carbon/result/iterator/AbstractDetailQueryResultIterator.java
+++ b/core/src/main/java/org/carbondata/query/carbon/result/iterator/AbstractDetailQueryResultIterator.java
@@ -97,10 +97,18 @@ public abstract class AbstractDetailQueryResultIterator extends CarbonIterator {
   public AbstractDetailQueryResultIterator(List<BlockExecutionInfo> infos,
       QueryExecutorProperties executerProperties, QueryModel queryModel,
       InternalQueryExecutor queryExecutor) {
+    // below code will be used to update the number of cores based on number
+    // records we
+    // can keep in memory while executing the query execution
     int recordSize = 0;
-    String defaultInMemoryRecordsSize =
-        CarbonProperties.getInstance().getProperty(CarbonCommonConstants.INMEMORY_REOCRD_SIZE);
-    if (null != defaultInMemoryRecordsSize) {
+
+    // in case of compaction we will pass the in memory record size.
+    int inMemoryRecordSizeInModel = queryModel.getInMemoryRecordSize();
+    if (inMemoryRecordSizeInModel > 0) {
+      recordSize = inMemoryRecordSizeInModel;
+    } else {
+      String defaultInMemoryRecordsSize =
+          CarbonProperties.getInstance().getProperty(CarbonCommonConstants.INMEMORY_REOCRD_SIZE);
       try {
         recordSize = Integer.parseInt(defaultInMemoryRecordsSize);
       } catch (NumberFormatException ne) {


[18/20] incubator-carbondata git commit: Merge remote-tracking branch 'apache-master/master' into merge7

Posted by ra...@apache.org.
Merge remote-tracking branch 'apache-master/master' into merge7


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/fd5493b9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/fd5493b9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/fd5493b9

Branch: refs/heads/master
Commit: fd5493b9dd4ebbba2a7cbec33dd8fc301b1e8316
Parents: d5b62e2 8552a82
Author: ravipesala <ra...@gmail.com>
Authored: Sat Aug 6 14:09:51 2016 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Sat Aug 6 14:09:51 2016 +0530

----------------------------------------------------------------------
 DISCLAIMER                                      |  10 +
 LICENSE                                         |   2 +-
 NOTICE                                          |   8 +
 assembly/pom.xml                                |  70 ++--
 common/pom.xml                                  |  52 +--
 common/src/test/java/log4j.properties           |  17 +
 conf/carbon.properties.template                 |  18 +
 core/CARBON_CORELogResource.properties          |  17 +
 core/pom.xml                                    |  65 +--
 .../carbon/datastore/block/AbstractIndex.java   |  18 +
 .../impl/VariableLengthDimensionDataChunk.java  |  18 +
 .../compression/type/UnCompressDefaultLong.java |  18 +
 .../type/UnCompressMaxMinByteForLong.java       |  18 +
 .../type/UnCompressMaxMinDefaultLong.java       |  18 +
 ...tiDimKeyVarLengthVariableSplitGenerator.java |  18 +
 .../core/util/CarbonMetadataUtil.java           |  18 +
 .../impl/NonFilterQueryScannedResult.java       |  18 +
 .../dictionary/AbstractDictionaryCacheTest.java |  18 +
 .../carbon/datastore/BlockIndexStoreTest.java   |  18 +
 .../datastore/block/SegmentPropertiesTest.java  |  18 +
 .../block/SegmentPropertiesTestUtil.java        |  18 +
 .../impl/btree/BTreeBlockFinderTest.java        |  18 +
 .../carbondata/core/util/CarbonUtilTest.java    |  18 +
 core/src/test/resources/carbonTest.properties   |  17 +
 dev/java-code-format-template.xml               |  18 +
 docs/Carbon-Packaging-and-Interfaces.md         |  19 +
 docs/Carbondata-File-Structure-and-Format.md    |  19 +
 docs/DDL-Operations-on-Carbon.md                |  18 +
 docs/DML-Operations-on-Carbon.md                |  19 +
 docs/Data-Management.md                         |  18 +
 ...stalling-CarbonData-And-IDE-Configuartion.md |  19 +
 examples/CARBON_EXAMPLESLogResource.properties  |  17 +
 examples/pom.xml                                |  69 ++--
 format/pom.xml                                  |  57 +--
 hadoop/CARBON_HADOOPLogResource.properties      |  17 +
 hadoop/pom.xml                                  |  69 ++--
 .../org/carbondata/hadoop/CarbonProjection.java |  18 +
 .../carbondata/hadoop/CarbonRecordReader.java   |  18 +
 .../hadoop/readsupport/CarbonReadSupport.java   |  18 +
 .../AbstractDictionaryDecodedReadSupport.java   |  18 +
 .../impl/ArrayWritableReadSupport.java          |  18 +
 .../impl/DictionaryDecodedReadSupportImpl.java  |  18 +
 .../readsupport/impl/RawDataReadSupport.java    |  18 +
 .../hadoop/util/ObjectSerializationUtil.java    |  18 +
 .../carbondata/hadoop/util/SchemaReader.java    |  18 +
 .../hadoop/ft/CarbonInputMapperTest.java        |  18 +
 integration-testcases/pom.xml                   | 397 ++++++++++---------
 .../spark/sql/common/util/CsvCompare.scala      |  18 +
 .../spark/sql/common/util/DataFrameUtil.scala   |  18 +
 ...CARBON_SPARK_INTERFACELogResource.properties |  17 +
 integration/spark/pom.xml                       |  70 ++--
 .../spark/merger/NodeMultiBlockRelation.java    |  18 +
 .../readsupport/SparkRowReadSupportImpl.java    |  18 +
 .../spark/load/CarbonLoaderUtilTest.java        |  18 +
 .../validation/FileFooterValidator.java         |  18 +
 .../apache/spark/sql/TestCarbonSqlParser.scala  |  18 +
 .../TestLoadDataWithEmptyArrayColumns.scala     |  18 +
 .../dataload/TestLoadDataWithJunkChars.scala    |  18 +
 .../DataCompactionCardinalityBoundryTest.scala  |  18 +
 .../datacompaction/DataCompactionLockTest.scala |  18 +
 .../DataCompactionNoDictionaryTest.scala        |  18 +
 .../datacompaction/DataCompactionTest.scala     |  18 +
 .../MajorCompactionIgnoreInMinorTest.scala      |  18 +
 .../MajorCompactionStopsAfterCompaction.scala   |  18 +
 .../deleteTable/TestDeleteTableNewDDL.scala     |  18 +
 .../ColumnPropertyValidationTestCase.scala      |  18 +
 .../NullMeasureValueTestCaseFilter.scala        |  18 +
 .../NullMeasureValueTestCaseAggregate.scala     |  18 +
 pom.xml                                         | 154 ++++++-
 .../CARBON_PROCESSINGLogResource.properties     |  17 +
 processing/pom.xml                              |  61 +--
 .../org/carbondata/lcm/locks/ZookeeperInit.java |  18 +
 .../csvreaderstep/CustomDataStream.java         |  18 +
 .../mdkeygen/messages/messages_en_US.properties |  17 +
 .../store/messages/messages_en_US.properties    |  17 +
 .../dbbased/messages/messages_en_US.properties  |  17 +
 .../messages/messages_en_US.properties          |  17 +
 .../carbondata/lcm/locks/LocalFileLockTest.java |  17 +-
 .../lcm/locks/ZooKeeperLockingTest.java         |  17 +-
 .../store/colgroup/ColGroupMinMaxTest.java      |  18 +
 80 files changed, 1821 insertions(+), 460 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/fd5493b9/integration/spark/src/test/scala/org/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala
----------------------------------------------------------------------


[11/20] incubator-carbondata git commit: [CARBONDATA-135] Fixed multiple hdfs client creation issue-Query scan flow (#902)

Posted by ra...@apache.org.
[CARBONDATA-135] Fixed multiple hdfs client creation issue-Query scan flow (#902)



Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/75ddcceb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/75ddcceb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/75ddcceb

Branch: refs/heads/master
Commit: 75ddcceb7bf6e681dd63c0bd082aeaeb6dd7388a
Parents: b4ae2eb
Author: Kumar Vishal <ku...@gmail.com>
Authored: Thu Aug 4 13:03:38 2016 +0530
Committer: Venkata Ramana G <g....@gmail.com>
Committed: Thu Aug 4 13:03:38 2016 +0530

----------------------------------------------------------------------
 .../core/datastorage/store/impl/DFSFileHolderImpl.java         | 4 ++--
 .../carbondata/core/datastorage/store/impl/FileFactory.java    | 6 +++---
 2 files changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/75ddcceb/core/src/main/java/org/carbondata/core/datastorage/store/impl/DFSFileHolderImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/datastorage/store/impl/DFSFileHolderImpl.java b/core/src/main/java/org/carbondata/core/datastorage/store/impl/DFSFileHolderImpl.java
index 653c243..2ffdb5a 100644
--- a/core/src/main/java/org/carbondata/core/datastorage/store/impl/DFSFileHolderImpl.java
+++ b/core/src/main/java/org/carbondata/core/datastorage/store/impl/DFSFileHolderImpl.java
@@ -28,11 +28,11 @@ import org.carbondata.common.logging.LogServiceFactory;
 import org.carbondata.core.constants.CarbonCommonConstants;
 import org.carbondata.core.datastorage.store.FileHolder;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
+
 public class DFSFileHolderImpl implements FileHolder {
 
   private static final LogService LOGGER =
@@ -66,7 +66,7 @@ public class DFSFileHolderImpl implements FileHolder {
     try {
       if (null == fileChannel) {
         Path pt = new Path(filePath);
-        FileSystem fs = pt.getFileSystem(new Configuration());
+        FileSystem fs = FileSystem.get(FileFactory.getConfiguration());
         fileChannel = fs.open(pt);
         fileNameAndStreamCache.put(filePath, fileChannel);
       }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/75ddcceb/core/src/main/java/org/carbondata/core/datastorage/store/impl/FileFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/datastorage/store/impl/FileFactory.java b/core/src/main/java/org/carbondata/core/datastorage/store/impl/FileFactory.java
index c88ade8..8005102 100644
--- a/core/src/main/java/org/carbondata/core/datastorage/store/impl/FileFactory.java
+++ b/core/src/main/java/org/carbondata/core/datastorage/store/impl/FileFactory.java
@@ -124,7 +124,7 @@ public final class FileFactory {
       case HDFS:
       case VIEWFS:
         Path pt = new Path(path);
-        FileSystem fs = pt.getFileSystem(configuration);
+        FileSystem fs = FileSystem.get(configuration);
         FSDataInputStream stream = fs.open(pt);
         return new DataInputStream(new BufferedInputStream(stream));
       default:
@@ -141,7 +141,7 @@ public final class FileFactory {
       case HDFS:
       case VIEWFS:
         Path pt = new Path(path);
-        FileSystem fs = pt.getFileSystem(configuration);
+        FileSystem fs = FileSystem.get(configuration);
         FSDataInputStream stream = fs.open(pt, bufferSize);
         return new DataInputStream(new BufferedInputStream(stream));
       default:
@@ -166,7 +166,7 @@ public final class FileFactory {
       case HDFS:
       case VIEWFS:
         Path pt = new Path(path);
-        FileSystem fs = pt.getFileSystem(configuration);
+        FileSystem fs = FileSystem.get(configuration);
         FSDataInputStream stream = fs.open(pt, bufferSize);
         stream.seek(offset);
         return new DataInputStream(new BufferedInputStream(stream));


[05/20] incubator-carbondata git commit: [CARBONDATA-111] No retry when compaction failure, so that it will not continuously retry (#896)

Posted by ra...@apache.org.
[CARBONDATA-111] No retry when compaction failure, so that it will not continuously retry (#896)

if any thing fails in rdd and compaction is failed then throwing exception so that it will not continuously retry

Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/ac8d866c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/ac8d866c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/ac8d866c

Branch: refs/heads/master
Commit: ac8d866ca76596a3054b3bd752894c38685658b0
Parents: f9c2e57
Author: ravikiran23 <ra...@gmail.com>
Authored: Tue Aug 2 01:08:40 2016 +0530
Committer: Venkata Ramana G <g....@gmail.com>
Committed: Tue Aug 2 01:08:40 2016 +0530

----------------------------------------------------------------------
 .../spark/src/main/scala/org/carbondata/spark/rdd/Compactor.scala   | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ac8d866c/integration/spark/src/main/scala/org/carbondata/spark/rdd/Compactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/rdd/Compactor.scala b/integration/spark/src/main/scala/org/carbondata/spark/rdd/Compactor.scala
index 2b17d57..45e3ac8 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/Compactor.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/rdd/Compactor.scala
@@ -148,6 +148,7 @@ object Compactor {
         .error("Compaction request failed for table " + carbonLoadModel
           .getDatabaseName + "." + carbonLoadModel.getTableName
         )
+      throw new Exception("Compaction Failure in Merger Rdd.")
     }
   }
 }