You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2018/12/11 06:21:05 UTC

carbondata git commit: [CARBONDATA-2838] Added SDV test cases for Local Dictionary Support

Repository: carbondata
Updated Branches:
  refs/heads/master 12c0d8ae4 -> e2f2b6cd1


[CARBONDATA-2838] Added SDV test cases for Local Dictionary Support

Added SDV test cases for Local Dictionary Support

This closes #2617


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/e2f2b6cd
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/e2f2b6cd
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/e2f2b6cd

Branch: refs/heads/master
Commit: e2f2b6cd19bdaf1efb5b24e63d183a8bfdc667cc
Parents: 12c0d8a
Author: praveenmeenakshi56 <pr...@gmail.com>
Authored: Wed Aug 8 11:45:49 2018 +0530
Committer: kunal642 <ku...@gmail.com>
Committed: Tue Dec 11 11:49:09 2018 +0530

----------------------------------------------------------------------
 .../sdv/generated/AlterTableTestCase.scala      |   2 +-
 .../LoadTableWithLocalDictionaryTestCase.scala  | 325 +++++++++++++++++++
 .../cluster/sdv/suite/SDVSuites.scala           |   7 +-
 3 files changed, 331 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2f2b6cd/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/AlterTableTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/AlterTableTestCase.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/AlterTableTestCase.scala
index 90fa602..2cf1794 100644
--- a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/AlterTableTestCase.scala
+++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/AlterTableTestCase.scala
@@ -1017,7 +1017,7 @@ class AlterTableTestCase extends QueryTest with BeforeAndAfterAll {
   }
 
   val prop = CarbonProperties.getInstance()
-  val p1 = prop.getProperty("carbon.horizontal.compaction.enable", CarbonCommonConstants.defaultIsHorizontalCompactionEnabled)
+  val p1 = prop.getProperty("carbon.horizontal.compaction.enable", CarbonCommonConstants.CARBON_HORIZONTAL_COMPACTION_ENABLE_DEFAULT)
   val p2 = prop.getProperty("carbon.horizontal.update.compaction.threshold", CarbonCommonConstants.DEFAULT_UPDATE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION)
   val p3 = prop.getProperty("carbon.horizontal.delete.compaction.threshold", CarbonCommonConstants.DEFAULT_DELETE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION)
   val p4 = prop.getProperty("carbon.compaction.level.threshold", CarbonCommonConstants.DEFAULT_SEGMENT_LEVEL_THRESHOLD)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2f2b6cd/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/LoadTableWithLocalDictionaryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/LoadTableWithLocalDictionaryTestCase.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/LoadTableWithLocalDictionaryTestCase.scala
new file mode 100644
index 0000000..199358b
--- /dev/null
+++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/LoadTableWithLocalDictionaryTestCase.scala
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.testsuite.localdictionary
+
+import scala.collection.JavaConverters._
+import java.io.{File, PrintWriter}
+import java.util
+import java.util.Collections
+
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.{BeforeAndAfterAll, Ignore}
+
+import org.apache.carbondata.core.cache.dictionary.DictionaryByteArrayWrapper
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.block.TableBlockInfo
+import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk
+import org.apache.carbondata.core.datastore.chunk.reader.CarbonDataReaderFactory
+import org.apache.carbondata.core.datastore.chunk.reader.dimension.v3.CompressedDimensionChunkFileBasedReaderV3
+import org.apache.carbondata.core.datastore.compression.CompressorFactory
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.datastore.page.encoding.DefaultEncodingFactory
+import org.apache.carbondata.core.metadata.ColumnarFormatVersion
+import org.apache.carbondata.core.util.{CarbonMetadataUtil, CarbonProperties, DataFileFooterConverterV3}
+
+class LoadTableWithLocalDictionaryTestCase extends QueryTest with BeforeAndAfterAll {
+
+  val file1 = resourcesPath + "/local_dictionary_source1.csv"
+
+  val file2 = resourcesPath + "/local_dictionary_complex_data.csv"
+
+  val storePath = warehouse + "/local2/Fact/Part0/Segment_0"
+
+  override protected def beforeAll(): Unit = {
+    CarbonProperties.getInstance.addProperty(CarbonCommonConstants.BLOCKLET_SIZE, "10000")
+    createFile(file1)
+    createComplexData(file2)
+    sql("drop table if exists local2")
+  }
+
+  test("test LocalDictionary Load For FallBackScenario"){
+    sql("drop table if exists local2")
+    sql(
+      "CREATE TABLE local2(name string) STORED BY 'carbondata' tblproperties" +
+      "('local_dictionary_threshold'='2001','local_dictionary_include'='name')")
+    sql("load data inpath '" + file1 + "' into table local2 OPTIONS('header'='false')")
+    assert(!checkForLocalDictionary(getDimRawChunk(0)))
+  }
+
+  test("test successful local dictionary generation"){
+    sql("drop table if exists local2")
+    sql(
+      "CREATE TABLE local2(name string) STORED BY 'carbondata' tblproperties" +
+      "('local_dictionary_enable'='true','local_dictionary_threshold'='9001','local_dictionary_include'='name')")
+    sql("load data inpath '" + file1 + "' into table local2 OPTIONS('header'='false')")
+    assert(checkForLocalDictionary(getDimRawChunk(0)))
+  }
+
+  test("test successful local dictionary generation for default configs") {
+    sql("drop table if exists local2")
+    sql("CREATE TABLE local2(name string) STORED BY 'carbondata' tblproperties('local_dictionary_enable'='true')")
+    sql("load data inpath '" + file1 + "' into table local2 OPTIONS('header'='false')")
+    assert(checkForLocalDictionary(getDimRawChunk(0)))
+  }
+
+  test("test local dictionary generation for local dictioanry include") {
+    sql("drop table if exists local2")
+    sql(
+      "CREATE TABLE local2(name string) STORED BY 'carbondata' tblproperties" +
+      "('dictionary_include'='name')")
+    sql("load data inpath '" + file1 + "' into table local2 OPTIONS('header'='false')")
+    assert(!checkForLocalDictionary(getDimRawChunk(0)))
+  }
+
+  test("test local dictionary generation for local dictioanry exclude"){
+    sql("drop table if exists local2")
+    sql(
+      "CREATE TABLE local2(name string) STORED BY 'carbondata' tblproperties" +
+      "('local_dictionary_enable'='true','dictionary_exclude'='name')")
+    sql("load data inpath '" + file1 + "' into table local2 OPTIONS('header'='false')")
+    assert(checkForLocalDictionary(getDimRawChunk(0)))
+  }
+
+  test("test local dictionary generation when it is disabled"){
+    sql("drop table if exists local2")
+    sql(
+      "CREATE TABLE local2(name string) STORED BY 'carbondata' tblproperties" +
+      "('local_dictionary_enable'='false','local_dictionary_include'='name')")
+    sql("load data inpath '" + file1 + "' into table local2 OPTIONS('header'='false')")
+    assert(!checkForLocalDictionary(getDimRawChunk(0)))
+  }
+
+  test("test local dictionary generation for invalid threshold configurations"){
+    sql("drop table if exists local2")
+    sql(
+      "CREATE TABLE local2(name string) STORED BY 'carbondata' tblproperties" +
+      "('local_dictionary_enable'='true','local_dictionary_include'='name','local_dictionary_threshold'='300000')")
+    sql("load data inpath '" + file1 + "' into table local2 OPTIONS('header'='false')")
+    assert(checkForLocalDictionary(getDimRawChunk(0)))
+  }
+
+  test("test local dictionary generation for include and exclude"){
+    sql("drop table if exists local2")
+    sql(
+      "CREATE TABLE local2(name string, age string) STORED BY 'carbondata' tblproperties" +
+      "('local_dictionary_enable'='true','local_dictionary_include'='name', 'local_dictionary_exclude'='age')")
+    sql("insert into table local2 values('vishal', '30')")
+    assert(checkForLocalDictionary(getDimRawChunk(0)))
+    assert(!checkForLocalDictionary(getDimRawChunk(1)))
+  }
+
+  test("test local dictionary generation for complex type"){
+    sql("drop table if exists local2")
+    sql(
+      "CREATE TABLE local2(name struct<i:string,s:string>) STORED BY 'carbondata' tblproperties" +
+      "('local_dictionary_enable'='true','local_dictionary_include'='name')")
+    sql("load data inpath '" + file2 +
+        "' into table local2 OPTIONS('header'='false','COMPLEX_DELIMITER_LEVEL_1'='$', " +
+        "'COMPLEX_DELIMITER_LEVEL_2'=':')")
+    assert(!checkForLocalDictionary(getDimRawChunk(0)))
+    assert(checkForLocalDictionary(getDimRawChunk(1)))
+    assert(checkForLocalDictionary(getDimRawChunk(2)))
+  }
+
+  test("test local dictionary data validation") {
+    sql("drop table if exists local_query_enable")
+    sql("drop table if exists local_query_disable")
+    sql(
+      "CREATE TABLE local_query_enable(name string) STORED BY 'carbondata' tblproperties" +
+      "('local_dictionary_enable'='false','local_dictionary_include'='name')")
+    sql("load data inpath '" + file1 + "' into table local_query_enable OPTIONS('header'='false')")
+    sql(
+      "CREATE TABLE local_query_disable(name string) STORED BY 'carbondata' tblproperties" +
+      "('local_dictionary_enable'='true','local_dictionary_include'='name')")
+    sql("load data inpath '" + file1 + "' into table local_query_disable OPTIONS('header'='false')")
+    checkAnswer(sql("select name from local_query_enable"), sql("select name from local_query_disable"))
+  }
+
+  test("test to validate local dictionary values"){
+    sql("drop table if exists local2")
+    sql("CREATE TABLE local2(name string) STORED BY 'carbondata' tblproperties('local_dictionary_enable'='true')")
+    sql("load data inpath '" + resourcesPath + "/localdictionary.csv" + "' into table local2")
+    val dimRawChunk = getDimRawChunk(0)
+    val dictionaryData = Array("vishal", "kumar", "akash", "praveen", "brijoo")
+    try
+      assert(validateDictionary(dimRawChunk.get(0), dictionaryData))
+    catch {
+      case e: Exception =>
+        assert(false)
+    }
+  }
+
+  override protected def afterAll(): Unit = {
+    sql("drop table if exists local2")
+    deleteFile(file1)
+    deleteFile(file2)
+    CarbonProperties.getInstance
+      .addProperty(CarbonCommonConstants.BLOCKLET_SIZE,
+        CarbonCommonConstants.BLOCKLET_SIZE_DEFAULT_VAL)
+  }
+
+  /**
+   * create the csv data file
+   *
+   * @param fileName
+   */
+  private def createFile(fileName: String, line: Int = 9000, start: Int = 0): Unit = {
+    val writer = new PrintWriter(new File(fileName))
+    val data: util.ArrayList[String] = new util.ArrayList[String]()
+    for (i <- start until line) {
+      data.add("n" + i)
+    }
+    Collections.sort(data)
+    data.asScala.foreach { eachdata =>
+      writer.println(eachdata)
+    }
+    writer.close()
+  }
+
+  /**
+   * create the csv for complex data
+   *
+   * @param fileName
+   */
+  private def createComplexData(fileName: String, line: Int = 9000, start: Int = 0): Unit = {
+    val writer = new PrintWriter(new File(fileName))
+    val data: util.ArrayList[String] = new util.ArrayList[String]()
+    for (i <- start until line) {
+      data.add("n" + i + "$" + (i + 1))
+    }
+    Collections.sort(data)
+    data.asScala.foreach { eachdata =>
+      writer.println(eachdata)
+    }
+    writer.close()
+  }
+
+  /**
+   * delete csv file after test execution
+   *
+   * @param fileName
+   */
+  private def deleteFile(fileName: String): Unit = {
+    val file: File = new File(fileName)
+    if (file.exists) file.delete
+  }
+
+  private def checkForLocalDictionary(dimensionRawColumnChunks: util
+  .List[DimensionRawColumnChunk]): Boolean = {
+    var isLocalDictionaryGenerated = false
+    import scala.collection.JavaConversions._
+    for (dimensionRawColumnChunk <- dimensionRawColumnChunks) {
+      if (dimensionRawColumnChunk.getDataChunkV3
+        .isSetLocal_dictionary) {
+        isLocalDictionaryGenerated = true
+      }
+    }
+    isLocalDictionaryGenerated
+  }
+
+  /**
+   * this method returns true if local dictionary is created for all the blocklets or not
+   *
+   * @return
+   */
+  private def getDimRawChunk(blockindex: Int): util.ArrayList[DimensionRawColumnChunk] = {
+    val dataFiles = FileFactory.getCarbonFile(storePath)
+      .listFiles(new CarbonFileFilter() {
+        override def accept(file: CarbonFile): Boolean = {
+          if (file.getName
+            .endsWith(CarbonCommonConstants.FACT_FILE_EXT)) {
+            true
+          } else {
+            false
+          }
+        }
+      })
+    val dimensionRawColumnChunks = read(dataFiles(0).getAbsolutePath,
+      blockindex)
+    dimensionRawColumnChunks
+  }
+
+  private def read(filePath: String, blockIndex: Int) = {
+    val carbonDataFiles = new File(filePath)
+    val dimensionRawColumnChunks = new
+        util.ArrayList[DimensionRawColumnChunk]
+    val offset = carbonDataFiles.length
+    val converter = new DataFileFooterConverterV3
+    val fileReader = FileFactory.getFileHolder(FileFactory.getFileType(filePath))
+    val actualOffset = fileReader.readLong(carbonDataFiles.getAbsolutePath, offset - 8)
+    val blockInfo = new TableBlockInfo(carbonDataFiles.getAbsolutePath,
+      actualOffset,
+      "0",
+      new Array[String](0),
+      carbonDataFiles.length,
+      ColumnarFormatVersion.V3,
+      null)
+    val dataFileFooter = converter.readDataFileFooter(blockInfo)
+    val blockletList = dataFileFooter.getBlockletList
+    import scala.collection.JavaConversions._
+    for (blockletInfo <- blockletList) {
+      val dimensionColumnChunkReader =
+        CarbonDataReaderFactory
+          .getInstance
+          .getDimensionColumnChunkReader(ColumnarFormatVersion.V3,
+            blockletInfo,
+            dataFileFooter.getSegmentInfo.getColumnCardinality,
+            carbonDataFiles.getAbsolutePath,
+            false).asInstanceOf[CompressedDimensionChunkFileBasedReaderV3]
+      dimensionRawColumnChunks
+        .add(dimensionColumnChunkReader.readRawDimensionChunk(fileReader, blockIndex))
+    }
+    dimensionRawColumnChunks
+  }
+
+  private def validateDictionary(rawColumnPage: DimensionRawColumnChunk,
+      data: Array[String]): Boolean = {
+    val local_dictionary = rawColumnPage.getDataChunkV3.local_dictionary
+    if (null != local_dictionary) {
+      val compressorName = CarbonMetadataUtil.getCompressorNameFromChunkMeta(
+        rawColumnPage.getDataChunkV3.getData_chunk_list.get(0).getChunk_meta)
+      val encodings = local_dictionary.getDictionary_meta.encoders
+      val encoderMetas = local_dictionary.getDictionary_meta.getEncoder_meta
+      val encodingFactory = DefaultEncodingFactory.getInstance
+      val decoder = encodingFactory.createDecoder(encodings, encoderMetas, compressorName)
+      val dictionaryPage = decoder
+        .decode(local_dictionary.getDictionary_data, 0, local_dictionary.getDictionary_data.length)
+      val dictionaryMap = new
+          util.HashMap[DictionaryByteArrayWrapper, Integer]
+      val usedDictionaryValues = util.BitSet
+        .valueOf(CompressorFactory.getInstance.getCompressor(compressorName)
+          .unCompressByte(local_dictionary.getDictionary_values))
+      var index = 0
+      var i = usedDictionaryValues.nextSetBit(0)
+      while ( { i >= 0 }) {
+        dictionaryMap
+          .put(new DictionaryByteArrayWrapper(dictionaryPage.getBytes({ index += 1; index - 1 })),
+            i)
+        i = usedDictionaryValues.nextSetBit(i + 1)
+      }
+      for (i <- data.indices) {
+        if (null == dictionaryMap.get(new DictionaryByteArrayWrapper(data(i).getBytes))) {
+          return false
+        }
+      }
+      return true
+    }
+    false
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2f2b6cd/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/suite/SDVSuites.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/suite/SDVSuites.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/suite/SDVSuites.scala
index 6756468..3993042 100644
--- a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/suite/SDVSuites.scala
+++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/suite/SDVSuites.scala
@@ -21,6 +21,7 @@ import org.scalatest.{BeforeAndAfterAll, Suites}
 
 import org.apache.carbondata.cluster.sdv.generated._
 import org.apache.carbondata.cluster.sdv.register.TestRegisterCarbonTable
+import org.apache.carbondata.spark.testsuite.localdictionary.LoadTableWithLocalDictionaryTestCase
 
 /**
  * Suite class for all tests.
@@ -63,7 +64,8 @@ class SDVSuites extends Suites with BeforeAndAfterAll {
                               new TimeSeriesPreAggregateTestCase ::
                               new TestPartitionWithGlobalSort ::
                               new PartitionWithPreAggregateTestCase ::
-                              new CreateTableWithLocalDictionaryTestCase :: Nil
+                              new CreateTableWithLocalDictionaryTestCase ::
+                              new LoadTableWithLocalDictionaryTestCase :: Nil
 
   override val nestedSuites = suites.toIndexedSeq
 
@@ -154,7 +156,8 @@ class SDVSuites3 extends Suites with BeforeAndAfterAll {
                     new SDKwriterTestCase ::
                     new SetParameterTestCase ::
                     new PartitionWithPreAggregateTestCase ::
-                    new CreateTableWithLocalDictionaryTestCase :: Nil
+                    new CreateTableWithLocalDictionaryTestCase ::
+                    new LoadTableWithLocalDictionaryTestCase :: Nil
 
   override val nestedSuites = suites.toIndexedSeq