You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by qi...@apache.org on 2018/02/27 01:32:15 UTC
[2/5] carbondata git commit: [HOTFIX] Add dava doc for datamap
interface
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ef3031d0/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGIndexDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGIndexDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGIndexDataMapTestCase.scala
deleted file mode 100644
index 5e944fb..0000000
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGIndexDataMapTestCase.scala
+++ /dev/null
@@ -1,379 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.spark.testsuite.datamap
-
-import java.io.{ByteArrayInputStream, DataOutputStream, ObjectInputStream, ObjectOutputStream}
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
-
-import com.sun.xml.internal.messaging.saaj.util.ByteOutputStream
-import org.apache.spark.sql.test.util.QueryTest
-import org.scalatest.BeforeAndAfterAll
-
-import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager}
-import org.apache.carbondata.core.datamap.dev.{AbstractDataMapWriter, DataMapModel}
-import org.apache.carbondata.core.datamap.dev.cgdatamap.{AbstractCoarseGrainIndexDataMap, AbstractCoarseGrainIndexDataMapFactory}
-import org.apache.carbondata.core.datastore.FileReader
-import org.apache.carbondata.core.datastore.block.SegmentProperties
-import org.apache.carbondata.core.datastore.compression.SnappyCompressor
-import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
-import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.datastore.page.ColumnPage
-import org.apache.carbondata.core.indexstore.Blocklet
-import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable
-import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
-import org.apache.carbondata.core.scan.expression.Expression
-import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression
-import org.apache.carbondata.core.scan.filter.intf.ExpressionType
-import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf
-import org.apache.carbondata.core.util.ByteUtil
-import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.events.Event
-import org.apache.carbondata.spark.testsuite.datacompaction.CompactionSupportGlobalSortBigFileTest
-
-class CGIndexDataMapFactory extends AbstractCoarseGrainIndexDataMapFactory {
- var identifier: AbsoluteTableIdentifier = _
- var dataMapSchema: DataMapSchema = _
-
- /**
- * Initialization of Datamap factory with the identifier and datamap name
- */
- override def init(identifier: AbsoluteTableIdentifier, dataMapSchema: DataMapSchema): Unit = {
- this.identifier = identifier
- this.dataMapSchema = dataMapSchema
- }
-
- /**
- * Return a new write for this datamap
- */
- override def createWriter(segmentId: String, dataWritePath: String): AbstractDataMapWriter = {
- new CGDataMapWriter(identifier, segmentId, dataWritePath, dataMapSchema)
- }
-
- /**
- * Get the datamap for segmentid
- */
- override def getDataMaps(segmentId: String) = {
- val file = FileFactory.getCarbonFile(CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId))
-
- val files = file.listFiles(new CarbonFileFilter {
- override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap")
- })
- files.map {f =>
- val dataMap: AbstractCoarseGrainIndexDataMap = new CGIndexDataMap()
- dataMap.init(new DataMapModel(f.getCanonicalPath))
- dataMap
- }.toList.asJava
- }
-
-
- /**
- * Get datamaps for distributable object.
- */
- override def getDataMaps(
- distributable: DataMapDistributable): java.util.List[AbstractCoarseGrainIndexDataMap] = {
- val mapDistributable = distributable.asInstanceOf[BlockletDataMapDistributable]
- val dataMap: AbstractCoarseGrainIndexDataMap = new CGIndexDataMap()
- dataMap.init(new DataMapModel(mapDistributable.getFilePath))
- Seq(dataMap).asJava
- }
-
- /**
- *
- * @param event
- */
- override def fireEvent(event: Event): Unit = {
- ???
- }
-
- /**
- * Get all distributable objects of a segmentid
- *
- * @return
- */
- override def toDistributable(segmentId: String) = {
- val file = FileFactory.getCarbonFile(CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId))
-
- val files = file.listFiles(new CarbonFileFilter {
- override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap")
- })
- files.map { f =>
- val d:DataMapDistributable = new BlockletDataMapDistributable(f.getCanonicalPath)
- d
- }.toList.asJava
- }
-
-
- /**
- * Clears datamap of the segment
- */
- override def clear(segmentId: String): Unit = {
-
- }
-
- /**
- * Clear all datamaps from memory
- */
- override def clear(): Unit = {
-
- }
-
- /**
- * Return metadata of this datamap
- */
- override def getMeta: DataMapMeta = {
- new DataMapMeta(dataMapSchema.getProperties.get("indexcolumns").split(",").toList.asJava,
- List(ExpressionType.EQUALS, ExpressionType.IN).asJava)
- }
-}
-
-class CGIndexDataMap extends AbstractCoarseGrainIndexDataMap {
-
- var maxMin: ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]))] = _
- var FileReader: FileReader = _
- var filePath: String = _
- val compressor = new SnappyCompressor
-
- /**
- * It is called to load the data map to memory or to initialize it.
- */
- override def init(dataMapModel: DataMapModel): Unit = {
- this.filePath = dataMapModel.getFilePath
- val size = FileFactory.getCarbonFile(filePath).getSize
- FileReader = FileFactory.getFileHolder(FileFactory.getFileType(filePath))
- val footerLen = FileReader.readInt(filePath, size-4)
- val bytes = FileReader.readByteArray(filePath, size-footerLen-4, footerLen)
- val in = new ByteArrayInputStream(compressor.unCompressByte(bytes))
- val obj = new ObjectInputStream(in)
- maxMin = obj.readObject().asInstanceOf[ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]))]]
- }
-
- /**
- * Prune the datamap with filter expression. It returns the list of
- * blocklets where these filters can exist.
- *
- * @param filterExp
- * @return
- */
- override def prune(
- filterExp: FilterResolverIntf,
- segmentProperties: SegmentProperties,
- partitions: java.util.List[String]): java.util.List[Blocklet] = {
- val buffer: ArrayBuffer[Expression] = new ArrayBuffer[Expression]()
- val expression = filterExp.getFilterExpression
- getEqualToExpression(expression, buffer)
- val value = buffer.map { f =>
- f.getChildren.get(1).evaluate(null).getString
- }
- val meta = findMeta(value(0).getBytes)
- meta.map { f=>
- new Blocklet(f._1, f._2+"")
- }.asJava
- }
-
-
- private def findMeta(value: Array[Byte]) = {
- val tuples = maxMin.filter { f =>
- ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._3._1) <= 0 &&
- ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._3._2) >= 0
- }
- tuples
- }
-
- private def getEqualToExpression(expression: Expression, buffer: ArrayBuffer[Expression]): Unit = {
- if (expression.isInstanceOf[EqualToExpression]) {
- buffer += expression
- } else {
- if (expression.getChildren != null) {
- expression.getChildren.asScala.map { f =>
- if (f.isInstanceOf[EqualToExpression]) {
- buffer += f
- }
- getEqualToExpression(f, buffer)
- }
- }
- }
- }
-
- /**
- * Clear complete index table and release memory.
- */
- override def clear() = {
- ???
- }
-
- override def isScanRequired(filterExp: FilterResolverIntf): Boolean = ???
-}
-
-class CGDataMapWriter(identifier: AbsoluteTableIdentifier,
- segmentId: String,
- dataWritePath: String,
- dataMapSchema: DataMapSchema)
- extends AbstractDataMapWriter(identifier, segmentId, dataWritePath) {
-
- var currentBlockId: String = null
- val cgwritepath = dataWritePath + "/" +
- dataMapSchema.getDataMapName + System.nanoTime() + ".datamap"
- lazy val stream: DataOutputStream = FileFactory
- .getDataOutputStream(cgwritepath, FileFactory.getFileType(cgwritepath))
- val blockletList = new ArrayBuffer[Array[Byte]]()
- val maxMin = new ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]))]()
- val compressor = new SnappyCompressor
-
- /**
- * Start of new block notification.
- *
- * @param blockId file name of the carbondata file
- */
- override def onBlockStart(blockId: String): Unit = {
- currentBlockId = blockId
- }
-
- /**
- * End of block notification
- */
- override def onBlockEnd(blockId: String): Unit = {
-
- }
-
- /**
- * Start of new blocklet notification.
- *
- * @param blockletId sequence number of blocklet in the block
- */
- override def onBlockletStart(blockletId: Int): Unit = {
-
- }
-
- /**
- * End of blocklet notification
- *
- * @param blockletId sequence number of blocklet in the block
- */
- override def onBlockletEnd(blockletId: Int): Unit = {
- val sorted = blockletList
- .sortWith((l, r) => ByteUtil.UnsafeComparer.INSTANCE.compareTo(l, r) <= 0)
- maxMin +=
- ((currentBlockId+"", blockletId, (sorted.last, sorted.head)))
- blockletList.clear()
- }
-
- /**
- * Add the column pages row to the datamap, order of pages is same as `indexColumns` in
- * DataMapMeta returned in IndexDataMapFactory.
- *
- * Implementation should copy the content of `pages` as needed, because `pages` memory
- * may be freed after this method returns, if using unsafe column page.
- */
- override def onPageAdded(blockletId: Int,
- pageId: Int,
- pages: Array[ColumnPage]): Unit = {
- val size = pages(0).getPageSize
- val list = new ArrayBuffer[Array[Byte]]()
- var i = 0
- while (i < size) {
- val bytes = pages(0).getBytes(i)
- val newBytes = new Array[Byte](bytes.length - 2)
- System.arraycopy(bytes, 2, newBytes, 0, newBytes.length)
- list += newBytes
- i = i + 1
- }
- // Sort based on the column data in order to create index.
- val sorted = list
- .sortWith((l, r) => ByteUtil.UnsafeComparer.INSTANCE.compareTo(l, r) <= 0)
- blockletList += sorted.head
- blockletList += sorted.last
- }
-
-
- /**
- * This is called during closing of writer.So after this call no more data will be sent to this
- * class.
- */
- override def finish(): Unit = {
- val out = new ByteOutputStream()
- val outStream = new ObjectOutputStream(out)
- outStream.writeObject(maxMin)
- outStream.close()
- val bytes = compressor.compressByte(out.getBytes)
- stream.write(bytes)
- stream.writeInt(bytes.length)
- stream.close()
- commitFile(cgwritepath)
- }
-
-
-}
-
-class CGIndexDataMapTestCase extends QueryTest with BeforeAndAfterAll {
-
- val file2 = resourcesPath + "/compaction/fil2.csv"
- override protected def beforeAll(): Unit = {
- //n should be about 5000000 of reset if size is default 1024
- val n = 150000
- CompactionSupportGlobalSortBigFileTest.createFile(file2, n * 4, n)
- sql("DROP TABLE IF EXISTS normal_test")
- sql(
- """
- | CREATE TABLE normal_test(id INT, name STRING, city STRING, age INT)
- | STORED BY 'org.apache.carbondata.format'
- | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
- """.stripMargin)
- sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE normal_test OPTIONS('header'='false')")
- }
-
- test("test cg datamap") {
- sql("DROP TABLE IF EXISTS datamap_test_cg")
- sql(
- """
- | CREATE TABLE datamap_test_cg(id INT, name STRING, city STRING, age INT)
- | STORED BY 'org.apache.carbondata.format'
- | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
- """.stripMargin)
- val table = CarbonMetadata.getInstance().getCarbonTable("default_datamap_test_cg")
- // register datamap writer
- sql(s"create datamap cgdatamap on table datamap_test_cg using '${classOf[CGIndexDataMapFactory].getName}' DMPROPERTIES('indexcolumns'='name')")
- sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test_cg OPTIONS('header'='false')")
- checkAnswer(sql("select * from datamap_test_cg where name='n502670'"),
- sql("select * from normal_test where name='n502670'"))
- }
-
- test("test cg datamap with 2 datamaps ") {
- sql("DROP TABLE IF EXISTS datamap_test")
- sql(
- """
- | CREATE TABLE datamap_test(id INT, name STRING, city STRING, age INT)
- | STORED BY 'org.apache.carbondata.format'
- | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
- """.stripMargin)
- val table = CarbonMetadata.getInstance().getCarbonTable("default_datamap_test")
- // register datamap writer
- sql(s"create datamap ggdatamap1 on table datamap_test using '${classOf[CGIndexDataMapFactory].getName}' DMPROPERTIES('indexcolumns'='name')")
- sql(s"create datamap ggdatamap2 on table datamap_test using '${classOf[CGIndexDataMapFactory].getName}' DMPROPERTIES('indexcolumns'='city')")
- sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')")
- checkAnswer(sql("select * from datamap_test where name='n502670' and city='c2670'"),
- sql("select * from normal_test where name='n502670' and city='c2670'"))
- }
-
- override protected def afterAll(): Unit = {
- CompactionSupportGlobalSortBigFileTest.deleteFile(file2)
- sql("DROP TABLE IF EXISTS normal_test")
- sql("DROP TABLE IF EXISTS datamap_test_cg")
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ef3031d0/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
new file mode 100644
index 0000000..6c13ae9
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.datamap
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.test.util.QueryTest
+import org.apache.spark.sql.{DataFrame, SaveMode}
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datamap.dev.DataMapWriter
+import org.apache.carbondata.core.datamap.dev.cgdatamap.{CoarseGrainDataMap, CoarseGrainDataMapFactory}
+import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta}
+import org.apache.carbondata.core.datastore.page.ColumnPage
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
+import org.apache.carbondata.core.scan.filter.intf.ExpressionType
+import org.apache.carbondata.core.metadata.datatype.DataTypes
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.events.Event
+
+class C2DataMapFactory() extends CoarseGrainDataMapFactory {
+
+ var identifier: AbsoluteTableIdentifier = _
+
+ override def init(identifier: AbsoluteTableIdentifier,
+ dataMapSchema: DataMapSchema): Unit = {
+ this.identifier = identifier
+ }
+
+ override def fireEvent(event: Event): Unit = ???
+
+ override def clear(segmentId: String): Unit = {}
+
+ override def clear(): Unit = {}
+
+ override def getDataMaps(distributable: DataMapDistributable): java.util.List[CoarseGrainDataMap] = ???
+
+ override def getDataMaps(segmentId: String): util.List[CoarseGrainDataMap] = ???
+
+ override def createWriter(segmentId: String, dataWritePath: String): DataMapWriter =
+ DataMapWriterSuite.dataMapWriterC2Mock(identifier, segmentId, dataWritePath)
+
+ override def getMeta: DataMapMeta = new DataMapMeta(List("c2").asJava, List(ExpressionType.EQUALS).asJava)
+
+ /**
+ * Get all distributable objects of a segmentid
+ *
+ * @return
+ */
+ override def toDistributable(segmentId: String): util.List[DataMapDistributable] = {
+ ???
+ }
+
+}
+
+class DataMapWriterSuite extends QueryTest with BeforeAndAfterAll {
+ def buildTestData(numRows: Int): DataFrame = {
+ import sqlContext.implicits._
+ sqlContext.sparkContext.parallelize(1 to numRows, 1)
+ .map(x => ("a" + x, "b", x))
+ .toDF("c1", "c2", "c3")
+ }
+
+ def dropTable(): Unit = {
+ sql("DROP TABLE IF EXISTS carbon1")
+ sql("DROP TABLE IF EXISTS carbon2")
+ }
+
+ override def beforeAll {
+ dropTable()
+ }
+
+ test("test write datamap 2 pages") {
+ sql(s"CREATE TABLE carbon1(c1 STRING, c2 STRING, c3 INT) STORED BY 'org.apache.carbondata.format'")
+ // register datamap writer
+ sql(s"CREATE DATAMAP test ON TABLE carbon1 USING '${classOf[C2DataMapFactory].getName}'")
+ val df = buildTestData(33000)
+
+ // save dataframe to carbon file
+ df.write
+ .format("carbondata")
+ .option("tableName", "carbon1")
+ .option("tempCSV", "false")
+ .option("sort_columns","c1")
+ .mode(SaveMode.Overwrite)
+ .save()
+
+ assert(DataMapWriterSuite.callbackSeq.head.contains("block start"))
+ assert(DataMapWriterSuite.callbackSeq.last.contains("block end"))
+ assert(
+ DataMapWriterSuite.callbackSeq.slice(1, DataMapWriterSuite.callbackSeq.length - 1) == Seq(
+ "blocklet start 0",
+ "add page data: blocklet 0, page 0",
+ "add page data: blocklet 0, page 1",
+ "blocklet end: 0"
+ ))
+ DataMapWriterSuite.callbackSeq = Seq()
+ }
+
+ test("test write datamap 2 blocklet") {
+ sql(s"CREATE TABLE carbon2(c1 STRING, c2 STRING, c3 INT) STORED BY 'org.apache.carbondata.format'")
+ sql(s"CREATE DATAMAP test ON TABLE carbon2 USING '${classOf[C2DataMapFactory].getName}'")
+
+ CarbonProperties.getInstance()
+ .addProperty("carbon.blockletgroup.size.in.mb", "1")
+ CarbonProperties.getInstance()
+ .addProperty("carbon.number.of.cores.while.loading",
+ CarbonCommonConstants.NUM_CORES_DEFAULT_VAL)
+
+ val df = buildTestData(300000)
+
+ // save dataframe to carbon file
+ df.write
+ .format("carbondata")
+ .option("tableName", "carbon2")
+ .option("tempCSV", "false")
+ .option("sort_columns","c1")
+ .option("SORT_SCOPE","GLOBAL_SORT")
+ .mode(SaveMode.Overwrite)
+ .save()
+
+ assert(DataMapWriterSuite.callbackSeq.head.contains("block start"))
+ assert(DataMapWriterSuite.callbackSeq.last.contains("block end"))
+ // corrected test case the min "carbon.blockletgroup.size.in.mb" size could not be less than
+ // 64 MB
+ assert(
+ DataMapWriterSuite.callbackSeq.slice(1, DataMapWriterSuite.callbackSeq.length - 1) == Seq(
+ "blocklet start 0",
+ "add page data: blocklet 0, page 0",
+ "add page data: blocklet 0, page 1",
+ "add page data: blocklet 0, page 2",
+ "add page data: blocklet 0, page 3",
+ "add page data: blocklet 0, page 4",
+ "add page data: blocklet 0, page 5",
+ "add page data: blocklet 0, page 6",
+ "add page data: blocklet 0, page 7",
+ "add page data: blocklet 0, page 8",
+ "add page data: blocklet 0, page 9",
+ "blocklet end: 0"
+ ))
+ DataMapWriterSuite.callbackSeq = Seq()
+ }
+
+ override def afterAll {
+ dropTable()
+ }
+}
+
+object DataMapWriterSuite {
+
+ var callbackSeq: Seq[String] = Seq[String]()
+
+ def dataMapWriterC2Mock(identifier: AbsoluteTableIdentifier, segmentId: String,
+ dataWritePath: String) =
+ new DataMapWriter(identifier, segmentId, dataWritePath) {
+
+ override def onPageAdded(
+ blockletId: Int,
+ pageId: Int,
+ pages: Array[ColumnPage]): Unit = {
+ assert(pages.length == 1)
+ assert(pages(0).getDataType == DataTypes.STRING)
+ val bytes: Array[Byte] = pages(0).getByteArrayPage()(0)
+ assert(bytes.sameElements(Seq(0, 1, 'b'.toByte)))
+ callbackSeq :+= s"add page data: blocklet $blockletId, page $pageId"
+ }
+
+ override def onBlockletEnd(blockletId: Int): Unit = {
+ callbackSeq :+= s"blocklet end: $blockletId"
+ }
+
+ override def onBlockEnd(blockId: String): Unit = {
+ callbackSeq :+= s"block end $blockId"
+ }
+
+ override def onBlockletStart(blockletId: Int): Unit = {
+ callbackSeq :+= s"blocklet start $blockletId"
+ }
+
+ /**
+ * Start of new block notification.
+ *
+ * @param blockId file name of the carbondata file
+ */
+ override def onBlockStart(blockId: String) = {
+ callbackSeq :+= s"block start $blockId"
+ }
+
+ /**
+ * This is called during closing of writer.So after this call no more data will be sent to this
+ * class.
+ */
+ override def finish() = {
+
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ef3031d0/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
new file mode 100644
index 0000000..84384b7
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
@@ -0,0 +1,473 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.testsuite.datamap
+
+import java.io.{ByteArrayInputStream, DataOutputStream, ObjectInputStream, ObjectOutputStream}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+import com.sun.xml.internal.messaging.saaj.util.ByteOutputStream
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.datamap.dev.fgdatamap.{FineGrainBlocklet, FineGrainDataMap, FineGrainDataMapFactory}
+import org.apache.carbondata.core.datamap.dev.{DataMapModel, DataMapWriter}
+import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta}
+import org.apache.carbondata.core.datastore.FileReader
+import org.apache.carbondata.core.datastore.block.SegmentProperties
+import org.apache.carbondata.core.datastore.compression.SnappyCompressor
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.datastore.page.ColumnPage
+import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
+import org.apache.carbondata.core.scan.expression.Expression
+import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression
+import org.apache.carbondata.core.scan.filter.intf.ExpressionType
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf
+import org.apache.carbondata.core.util.ByteUtil
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.events.Event
+import org.apache.carbondata.spark.testsuite.datacompaction.CompactionSupportGlobalSortBigFileTest
+
+class FGDataMapFactory extends FineGrainDataMapFactory {
+ var identifier: AbsoluteTableIdentifier = _
+ var dataMapSchema: DataMapSchema = _
+
+ /**
+ * Initialization of Datamap factory with the identifier and datamap name
+ */
+ override def init(identifier: AbsoluteTableIdentifier, dataMapSchema: DataMapSchema): Unit = {
+ this.identifier = identifier
+ this.dataMapSchema = dataMapSchema
+ }
+
+ /**
+ * Return a new write for this datamap
+ */
+ override def createWriter(segmentId: String, dataWritePath: String): DataMapWriter = {
+ new FGDataMapWriter(identifier, segmentId, dataWritePath, dataMapSchema)
+ }
+
+ /**
+ * Get the datamap for segmentid
+ */
+ override def getDataMaps(segmentId: String): java.util.List[FineGrainDataMap] = {
+ val file = FileFactory
+ .getCarbonFile(CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId))
+
+ val files = file.listFiles(new CarbonFileFilter {
+ override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap")
+ })
+ files.map { f =>
+ val dataMap: FineGrainDataMap = new FGDataMap()
+ dataMap.init(new DataMapModel(f.getCanonicalPath))
+ dataMap
+ }.toList.asJava
+ }
+
+ /**
+ * Get datamap for distributable object.
+ */
+ override def getDataMaps(
+ distributable: DataMapDistributable): java.util.List[FineGrainDataMap]= {
+ val mapDistributable = distributable.asInstanceOf[BlockletDataMapDistributable]
+ val dataMap: FineGrainDataMap = new FGDataMap()
+ dataMap.init(new DataMapModel(mapDistributable.getFilePath))
+ Seq(dataMap).asJava
+ }
+
+ /**
+ * Get all distributable objects of a segmentid
+ *
+ * @return
+ */
+ override def toDistributable(segmentId: String): java.util.List[DataMapDistributable] = {
+ val file = FileFactory
+ .getCarbonFile(CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId))
+
+ val files = file.listFiles(new CarbonFileFilter {
+ override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap")
+ })
+ files.map { f =>
+ val d: DataMapDistributable = new BlockletDataMapDistributable(f.getCanonicalPath)
+ d
+ }.toList.asJava
+ }
+
+
+ /**
+ *
+ * @param event
+ */
+ override def fireEvent(event: Event):Unit = {
+ ???
+ }
+
+ /**
+ * Clears datamap of the segment
+ */
+ override def clear(segmentId: String): Unit = {
+ }
+
+ /**
+ * Clear all datamaps from memory
+ */
+ override def clear(): Unit = {
+ }
+
+ /**
+ * Return metadata of this datamap
+ */
+ override def getMeta: DataMapMeta = {
+ new DataMapMeta(dataMapSchema.getProperties.get("indexcolumns").split(",").toList.asJava,
+ List(ExpressionType.EQUALS, ExpressionType.IN).asJava)
+ }
+}
+
+class FGDataMap extends FineGrainDataMap {
+
+ var maxMin: ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]), Long, Int)] = _
+ var FileReader: FileReader = _
+ var filePath: String = _
+ val compressor = new SnappyCompressor
+
+ /**
+ * It is called to load the data map to memory or to initialize it.
+ */
+ override def init(dataMapModel: DataMapModel): Unit = {
+ this.filePath = dataMapModel.getFilePath
+ val size = FileFactory.getCarbonFile(filePath).getSize
+ FileReader = FileFactory.getFileHolder(FileFactory.getFileType(filePath))
+ val footerLen = FileReader.readInt(filePath, size - 4)
+ val bytes = FileReader.readByteArray(filePath, size - footerLen - 4, footerLen)
+ val in = new ByteArrayInputStream(compressor.unCompressByte(bytes))
+ val obj = new ObjectInputStream(in)
+ maxMin = obj.readObject()
+ .asInstanceOf[ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]), Long, Int)]]
+ }
+
+ /**
+ * Prune the datamap with filter expression. It returns the list of
+ * blocklets where these filters can exist.
+ *
+ * @param filterExp
+ * @return
+ */
+ override def prune(
+ filterExp: FilterResolverIntf,
+ segmentProperties: SegmentProperties,
+ partitions: java.util.List[String]): java.util.List[FineGrainBlocklet] = {
+ val buffer: ArrayBuffer[Expression] = new ArrayBuffer[Expression]()
+ val expression = filterExp.getFilterExpression
+ getEqualToExpression(expression, buffer)
+ val value = buffer.map { f =>
+ f.getChildren.get(1).evaluate(null).getString
+ }
+ val meta = findMeta(value(0).getBytes)
+ meta.map { f =>
+ readAndFindData(f, value(0).getBytes())
+ }.filter(_.isDefined).map(_.get).asJava
+ }
+
+ private def readAndFindData(meta: (String, Int, (Array[Byte], Array[Byte]), Long, Int),
+ value: Array[Byte]): Option[FineGrainBlocklet] = {
+ val bytes = FileReader.readByteArray(filePath, meta._4, meta._5)
+ val outputStream = new ByteArrayInputStream(compressor.unCompressByte(bytes))
+ val obj = new ObjectInputStream(outputStream)
+ val blockletsData = obj.readObject()
+ .asInstanceOf[ArrayBuffer[(Array[Byte], Seq[Seq[Int]], Seq[Int])]]
+
+ import scala.collection.Searching._
+ val searching = blockletsData
+ .search[(Array[Byte], Seq[Seq[Int]], Seq[Int])]((value, Seq(Seq(0)), Seq(0)))(new Ordering[
+ (Array[Byte], Seq[Seq[Int]], Seq[Int])] {
+ override def compare(x: (Array[Byte], Seq[Seq[Int]], Seq[Int]),
+ y: (Array[Byte], Seq[Seq[Int]], Seq[Int])) = {
+ ByteUtil.UnsafeComparer.INSTANCE.compareTo(x._1, y._1)
+ }
+ })
+ if (searching.insertionPoint >= 0) {
+ val f = blockletsData(searching.insertionPoint)
+ val pages = f._3.zipWithIndex.map { p =>
+ val pg = new FineGrainBlocklet.Page
+ pg.setPageId(p._1)
+ pg.setRowId(f._2(p._2).toArray)
+ pg
+ }
+ pages
+ Some(new FineGrainBlocklet(meta._1, meta._2.toString, pages.toList.asJava))
+ } else {
+ None
+ }
+
+ }
+
+ private def findMeta(value: Array[Byte]) = {
+ val tuples = maxMin.filter { f =>
+ ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._3._1) >= 0 &&
+ ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._3._2) <= 0
+ }
+ tuples
+ }
+
+ def getEqualToExpression(expression: Expression, buffer: ArrayBuffer[Expression]): Unit = {
+ if (expression.isInstanceOf[EqualToExpression]) {
+ buffer += expression
+ } else {
+ if (expression.getChildren != null) {
+ expression.getChildren.asScala.map { f =>
+ if (f.isInstanceOf[EqualToExpression]) {
+ buffer += f
+ }
+ getEqualToExpression(f, buffer)
+ }
+ }
+ }
+ }
+
+ /**
+ * Clear complete index table and release memory.
+ */
+ override def clear():Unit = {
+ ???
+ }
+
+ override def isScanRequired(filterExp: FilterResolverIntf): Boolean = ???
+}
+
+class FGDataMapWriter(identifier: AbsoluteTableIdentifier,
+ segmentId: String, dataWriterPath: String, dataMapSchema: DataMapSchema)
+ extends DataMapWriter(identifier, segmentId, dataWriterPath) {
+
+ var currentBlockId: String = null
+ val fgwritepath = dataWriterPath + "/" + dataMapSchema.getDataMapName + System.nanoTime() +
+ ".datamap"
+ val stream: DataOutputStream = FileFactory
+ .getDataOutputStream(fgwritepath, FileFactory.getFileType(fgwritepath))
+ val blockletList = new ArrayBuffer[(Array[Byte], Seq[Int], Seq[Int])]()
+ val maxMin = new ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]), Long, Int)]()
+ var position: Long = 0
+ val compressor = new SnappyCompressor
+
+ /**
+ * Start of new block notification.
+ *
+ * @param blockId file name of the carbondata file
+ */
+ override def onBlockStart(blockId: String): Unit = {
+ currentBlockId = blockId
+ }
+
+ /**
+ * End of block notification
+ */
+ override def onBlockEnd(blockId: String): Unit = {
+
+ }
+
+ /**
+ * Start of new blocklet notification.
+ *
+ * @param blockletId sequence number of blocklet in the block
+ */
+ override def onBlockletStart(blockletId: Int): Unit = {
+
+ }
+
+ /**
+ * End of blocklet notification
+ *
+ * @param blockletId sequence number of blocklet in the block
+ */
+ override def onBlockletEnd(blockletId: Int): Unit = {
+ val sorted = blockletList
+ .sortWith((l, r) => ByteUtil.UnsafeComparer.INSTANCE.compareTo(l._1, r._1) <= 0)
+ var oldValue: (Array[Byte], Seq[Seq[Int]], Seq[Int]) = null
+ var addedLast: Boolean = false
+ val blockletListUpdated = new ArrayBuffer[(Array[Byte], Seq[Seq[Int]], Seq[Int])]()
+ // Merge all same column values to single row.
+ sorted.foreach { f =>
+ if (oldValue != null) {
+ if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(f._1, oldValue._1) == 0) {
+ oldValue = (oldValue._1, oldValue._2 ++ Seq(f._2), oldValue._3 ++ f._3)
+ addedLast = false
+ } else {
+ blockletListUpdated += oldValue
+ oldValue = (f._1, Seq(f._2), f._3)
+ addedLast = true
+ }
+ } else {
+ oldValue = (f._1, Seq(f._2), f._3)
+ addedLast = false
+ }
+ }
+ if (!addedLast && oldValue != null) {
+ blockletListUpdated += oldValue
+ }
+
+ val out = new ByteOutputStream()
+ val outStream = new ObjectOutputStream(out)
+ outStream.writeObject(blockletListUpdated)
+ outStream.close()
+ val bytes = compressor.compressByte(out.getBytes)
+ stream.write(bytes)
+ maxMin +=
+ ((currentBlockId + "", blockletId, (blockletListUpdated.head._1, blockletListUpdated.last
+ ._1), position, bytes.length))
+ position += bytes.length
+ blockletList.clear()
+ }
+
+ /**
+ * Add the column pages row to the datamap, order of pages is same as `indexColumns` in
+ * DataMapMeta returned in DataMapFactory.
+ *
+ * Implementation should copy the content of `pages` as needed, because `pages` memory
+ * may be freed after this method returns, if using unsafe column page.
+ */
+ override def onPageAdded(blockletId: Int,
+ pageId: Int,
+ pages: Array[ColumnPage]): Unit = {
+ val size = pages(0).getPageSize
+ val list = new ArrayBuffer[(Array[Byte], Int)]()
+ var i = 0
+ while (i < size) {
+ val bytes = pages(0).getBytes(i)
+ val newBytes = new Array[Byte](bytes.length - 2)
+ System.arraycopy(bytes, 2, newBytes, 0, newBytes.length)
+ list += ((newBytes, i))
+ i = i + 1
+ }
+ // Sort based on the column data in order to create index.
+ val sorted = list
+ .sortWith((l, r) => ByteUtil.UnsafeComparer.INSTANCE.compareTo(l._1, r._1) <= 0)
+ var oldValue: (Array[Byte], Seq[Int], Seq[Int]) = null
+ var addedLast: Boolean = false
+ // Merge all same column values to single row.
+ sorted.foreach { f =>
+ if (oldValue != null) {
+ if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(f._1, oldValue._1) == 0) {
+ oldValue = (oldValue._1, oldValue._2 ++ Seq(f._2), oldValue._3)
+ addedLast = false
+ } else {
+ blockletList += oldValue
+ oldValue = (f._1, Seq(f._2), Seq(pageId))
+ addedLast = true
+ }
+ } else {
+ oldValue = (f._1, Seq(f._2), Seq(pageId))
+ addedLast = false
+ }
+ }
+ if (!addedLast && oldValue != null) {
+ blockletList += oldValue
+ }
+ }
+
+
+ /**
+ * This is called during closing of writer.So after this call no more data will be sent to this
+ * class.
+ */
+ override def finish(): Unit = {
+ val out = new ByteOutputStream()
+ val outStream = new ObjectOutputStream(out)
+ outStream.writeObject(maxMin)
+ outStream.close()
+ val bytes = compressor.compressByte(out.getBytes)
+ stream.write(bytes)
+ stream.writeInt(bytes.length)
+ stream.close()
+ commitFile(fgwritepath)
+ }
+}
+
+class FGDataMapTestCase extends QueryTest with BeforeAndAfterAll {
+
+ val file2 = resourcesPath + "/compaction/fil2.csv"
+
+ override protected def beforeAll(): Unit = {
+ //n should be about 5000000 of reset if size is default 1024
+ val n = 150000
+ CompactionSupportGlobalSortBigFileTest.createFile(file2, n * 4, n)
+ sql("DROP TABLE IF EXISTS normal_test")
+ sql(
+ """
+ | CREATE TABLE normal_test(id INT, name STRING, city STRING, age INT)
+ | STORED BY 'org.apache.carbondata.format'
+ | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+ """.stripMargin)
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE normal_test OPTIONS('header'='false')")
+ }
+
+ test("test fg datamap") {
+ sql("DROP TABLE IF EXISTS datamap_test")
+ sql(
+ """
+ | CREATE TABLE datamap_test(id INT, name STRING, city STRING, age INT)
+ | STORED BY 'org.apache.carbondata.format'
+ | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+ """.stripMargin)
+ val table = CarbonMetadata.getInstance().getCarbonTable("default_datamap_test")
+ // register datamap writer
+ sql(
+ s"""
+ | CREATE DATAMAP ggdatamap ON TABLE datamap_test
+ | USING '${classOf[FGDataMapFactory].getName}'
+ | DMPROPERTIES('indexcolumns'='name')
+ """.stripMargin)
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')")
+ checkAnswer(sql("select * from datamap_test where name='n502670'"),
+ sql("select * from normal_test where name='n502670'"))
+ }
+
+ test("test fg datamap with 2 datamaps ") {
+ sql("DROP TABLE IF EXISTS datamap_test")
+ sql(
+ """
+ | CREATE TABLE datamap_test(id INT, name STRING, city STRING, age INT)
+ | STORED BY 'org.apache.carbondata.format'
+ | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+ """.stripMargin)
+ val table = CarbonMetadata.getInstance().getCarbonTable("default_datamap_test")
+ // register datamap writer
+ sql(
+ s"""
+ | CREATE DATAMAP ggdatamap1 ON TABLE datamap_test
+ | USING '${classOf[FGDataMapFactory].getName}'
+ | DMPROPERTIES('indexcolumns'='name')
+ """.stripMargin)
+ sql(
+ s"""
+ | CREATE DATAMAP ggdatamap2 ON TABLE datamap_test
+ | USING '${classOf[FGDataMapFactory].getName}'
+ | DMPROPERTIES('indexcolumns'='city')
+ """.stripMargin)
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')")
+ checkAnswer(sql("select * from datamap_test where name='n502670' and city='c2670'"),
+ sql("select * from normal_test where name='n502670' and city='c2670'"))
+ }
+
+ override protected def afterAll(): Unit = {
+ CompactionSupportGlobalSortBigFileTest.deleteFile(file2)
+ sql("DROP TABLE IF EXISTS normal_test")
+ sql("DROP TABLE IF EXISTS datamap_test")
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ef3031d0/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGIndexDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGIndexDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGIndexDataMapTestCase.scala
deleted file mode 100644
index 8ddad75..0000000
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGIndexDataMapTestCase.scala
+++ /dev/null
@@ -1,474 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.spark.testsuite.datamap
-
-import java.io.{ByteArrayInputStream, DataOutputStream, ObjectInputStream, ObjectOutputStream}
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
-
-import com.sun.xml.internal.messaging.saaj.util.ByteOutputStream
-import org.apache.spark.sql.test.util.QueryTest
-import org.scalatest.BeforeAndAfterAll
-
-import org.apache.carbondata.core.datamap.dev.fgdatamap.{AbstractFineGrainIndexDataMap, AbstractFineGrainIndexDataMapFactory}
-import org.apache.carbondata.core.datamap.dev.{AbstractDataMapWriter, DataMapModel}
-import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta}
-import org.apache.carbondata.core.datastore.FileReader
-import org.apache.carbondata.core.datastore.block.SegmentProperties
-import org.apache.carbondata.core.datastore.compression.SnappyCompressor
-import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
-import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.datastore.page.ColumnPage
-import org.apache.carbondata.core.indexstore.FineGrainBlocklet
-import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable
-import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
-import org.apache.carbondata.core.scan.expression.Expression
-import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression
-import org.apache.carbondata.core.scan.filter.intf.ExpressionType
-import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf
-import org.apache.carbondata.core.util.ByteUtil
-import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.events.Event
-import org.apache.carbondata.spark.testsuite.datacompaction.CompactionSupportGlobalSortBigFileTest
-
-class FGIndexDataMapFactory extends AbstractFineGrainIndexDataMapFactory {
- var identifier: AbsoluteTableIdentifier = _
- var dataMapSchema: DataMapSchema = _
-
- /**
- * Initialization of Datamap factory with the identifier and datamap name
- */
- override def init(identifier: AbsoluteTableIdentifier, dataMapSchema: DataMapSchema): Unit = {
- this.identifier = identifier
- this.dataMapSchema = dataMapSchema
- }
-
- /**
- * Return a new write for this datamap
- */
- override def createWriter(segmentId: String, dataWritePath: String): AbstractDataMapWriter = {
- new FGDataMapWriter(identifier, segmentId, dataWritePath, dataMapSchema)
- }
-
- /**
- * Get the datamap for segmentid
- */
- override def getDataMaps(segmentId: String): java.util.List[AbstractFineGrainIndexDataMap] = {
- val file = FileFactory
- .getCarbonFile(CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId))
-
- val files = file.listFiles(new CarbonFileFilter {
- override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap")
- })
- files.map { f =>
- val dataMap: AbstractFineGrainIndexDataMap = new FGIndexDataMap()
- dataMap.init(new DataMapModel(f.getCanonicalPath))
- dataMap
- }.toList.asJava
- }
-
- /**
- * Get datamap for distributable object.
- */
- override def getDataMaps(
- distributable: DataMapDistributable): java.util.List[AbstractFineGrainIndexDataMap]= {
- val mapDistributable = distributable.asInstanceOf[BlockletDataMapDistributable]
- val dataMap: AbstractFineGrainIndexDataMap = new FGIndexDataMap()
- dataMap.init(new DataMapModel(mapDistributable.getFilePath))
- Seq(dataMap).asJava
- }
-
- /**
- * Get all distributable objects of a segmentid
- *
- * @return
- */
- override def toDistributable(segmentId: String): java.util.List[DataMapDistributable] = {
- val file = FileFactory
- .getCarbonFile(CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId))
-
- val files = file.listFiles(new CarbonFileFilter {
- override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap")
- })
- files.map { f =>
- val d: DataMapDistributable = new BlockletDataMapDistributable(f.getCanonicalPath)
- d
- }.toList.asJava
- }
-
-
- /**
- *
- * @param event
- */
- override def fireEvent(event: Event):Unit = {
- ???
- }
-
- /**
- * Clears datamap of the segment
- */
- override def clear(segmentId: String): Unit = {
- }
-
- /**
- * Clear all datamaps from memory
- */
- override def clear(): Unit = {
- }
-
- /**
- * Return metadata of this datamap
- */
- override def getMeta: DataMapMeta = {
- new DataMapMeta(dataMapSchema.getProperties.get("indexcolumns").split(",").toList.asJava,
- List(ExpressionType.EQUALS, ExpressionType.IN).asJava)
- }
-}
-
-class FGIndexDataMap extends AbstractFineGrainIndexDataMap {
-
- var maxMin: ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]), Long, Int)] = _
- var FileReader: FileReader = _
- var filePath: String = _
- val compressor = new SnappyCompressor
-
- /**
- * It is called to load the data map to memory or to initialize it.
- */
- override def init(dataMapModel: DataMapModel): Unit = {
- this.filePath = dataMapModel.getFilePath
- val size = FileFactory.getCarbonFile(filePath).getSize
- FileReader = FileFactory.getFileHolder(FileFactory.getFileType(filePath))
- val footerLen = FileReader.readInt(filePath, size - 4)
- val bytes = FileReader.readByteArray(filePath, size - footerLen - 4, footerLen)
- val in = new ByteArrayInputStream(compressor.unCompressByte(bytes))
- val obj = new ObjectInputStream(in)
- maxMin = obj.readObject()
- .asInstanceOf[ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]), Long, Int)]]
- }
-
- /**
- * Prune the datamap with filter expression. It returns the list of
- * blocklets where these filters can exist.
- *
- * @param filterExp
- * @return
- */
- override def prune(
- filterExp: FilterResolverIntf,
- segmentProperties: SegmentProperties,
- partitions: java.util.List[String]): java.util.List[FineGrainBlocklet] = {
- val buffer: ArrayBuffer[Expression] = new ArrayBuffer[Expression]()
- val expression = filterExp.getFilterExpression
- getEqualToExpression(expression, buffer)
- val value = buffer.map { f =>
- f.getChildren.get(1).evaluate(null).getString
- }
- val meta = findMeta(value(0).getBytes)
- meta.map { f =>
- readAndFindData(f, value(0).getBytes())
- }.filter(_.isDefined).map(_.get).asJava
- }
-
- private def readAndFindData(meta: (String, Int, (Array[Byte], Array[Byte]), Long, Int),
- value: Array[Byte]): Option[FineGrainBlocklet] = {
- val bytes = FileReader.readByteArray(filePath, meta._4, meta._5)
- val outputStream = new ByteArrayInputStream(compressor.unCompressByte(bytes))
- val obj = new ObjectInputStream(outputStream)
- val blockletsData = obj.readObject()
- .asInstanceOf[ArrayBuffer[(Array[Byte], Seq[Seq[Int]], Seq[Int])]]
-
- import scala.collection.Searching._
- val searching = blockletsData
- .search[(Array[Byte], Seq[Seq[Int]], Seq[Int])]((value, Seq(Seq(0)), Seq(0)))(new Ordering[
- (Array[Byte], Seq[Seq[Int]], Seq[Int])] {
- override def compare(x: (Array[Byte], Seq[Seq[Int]], Seq[Int]),
- y: (Array[Byte], Seq[Seq[Int]], Seq[Int])) = {
- ByteUtil.UnsafeComparer.INSTANCE.compareTo(x._1, y._1)
- }
- })
- if (searching.insertionPoint >= 0) {
- val f = blockletsData(searching.insertionPoint)
- val pages = f._3.zipWithIndex.map { p =>
- val pg = new FineGrainBlocklet.Page
- pg.setPageId(p._1)
- pg.setRowId(f._2(p._2).toArray)
- pg
- }
- pages
- Some(new FineGrainBlocklet(meta._1, meta._2.toString, pages.toList.asJava))
- } else {
- None
- }
-
- }
-
- private def findMeta(value: Array[Byte]) = {
- val tuples = maxMin.filter { f =>
- ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._3._1) >= 0 &&
- ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._3._2) <= 0
- }
- tuples
- }
-
- def getEqualToExpression(expression: Expression, buffer: ArrayBuffer[Expression]): Unit = {
- if (expression.isInstanceOf[EqualToExpression]) {
- buffer += expression
- } else {
- if (expression.getChildren != null) {
- expression.getChildren.asScala.map { f =>
- if (f.isInstanceOf[EqualToExpression]) {
- buffer += f
- }
- getEqualToExpression(f, buffer)
- }
- }
- }
- }
-
- /**
- * Clear complete index table and release memory.
- */
- override def clear():Unit = {
- ???
- }
-
- override def isScanRequired(filterExp: FilterResolverIntf): Boolean = ???
-}
-
-class FGDataMapWriter(identifier: AbsoluteTableIdentifier,
- segmentId: String, dataWriterPath: String, dataMapSchema: DataMapSchema)
- extends AbstractDataMapWriter(identifier, segmentId, dataWriterPath) {
-
- var currentBlockId: String = null
- val fgwritepath = dataWriterPath + "/" + dataMapSchema.getDataMapName + System.nanoTime() +
- ".datamap"
- val stream: DataOutputStream = FileFactory
- .getDataOutputStream(fgwritepath, FileFactory.getFileType(fgwritepath))
- val blockletList = new ArrayBuffer[(Array[Byte], Seq[Int], Seq[Int])]()
- val maxMin = new ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]), Long, Int)]()
- var position: Long = 0
- val compressor = new SnappyCompressor
-
- /**
- * Start of new block notification.
- *
- * @param blockId file name of the carbondata file
- */
- override def onBlockStart(blockId: String): Unit = {
- currentBlockId = blockId
- }
-
- /**
- * End of block notification
- */
- override def onBlockEnd(blockId: String): Unit = {
-
- }
-
- /**
- * Start of new blocklet notification.
- *
- * @param blockletId sequence number of blocklet in the block
- */
- override def onBlockletStart(blockletId: Int): Unit = {
-
- }
-
- /**
- * End of blocklet notification
- *
- * @param blockletId sequence number of blocklet in the block
- */
- override def onBlockletEnd(blockletId: Int): Unit = {
- val sorted = blockletList
- .sortWith((l, r) => ByteUtil.UnsafeComparer.INSTANCE.compareTo(l._1, r._1) <= 0)
- var oldValue: (Array[Byte], Seq[Seq[Int]], Seq[Int]) = null
- var addedLast: Boolean = false
- val blockletListUpdated = new ArrayBuffer[(Array[Byte], Seq[Seq[Int]], Seq[Int])]()
- // Merge all same column values to single row.
- sorted.foreach { f =>
- if (oldValue != null) {
- if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(f._1, oldValue._1) == 0) {
- oldValue = (oldValue._1, oldValue._2 ++ Seq(f._2), oldValue._3 ++ f._3)
- addedLast = false
- } else {
- blockletListUpdated += oldValue
- oldValue = (f._1, Seq(f._2), f._3)
- addedLast = true
- }
- } else {
- oldValue = (f._1, Seq(f._2), f._3)
- addedLast = false
- }
- }
- if (!addedLast && oldValue != null) {
- blockletListUpdated += oldValue
- }
-
- val out = new ByteOutputStream()
- val outStream = new ObjectOutputStream(out)
- outStream.writeObject(blockletListUpdated)
- outStream.close()
- val bytes = compressor.compressByte(out.getBytes)
- stream.write(bytes)
- maxMin +=
- ((currentBlockId + "", blockletId, (blockletListUpdated.head._1, blockletListUpdated.last
- ._1), position, bytes.length))
- position += bytes.length
- blockletList.clear()
- }
-
- /**
- * Add the column pages row to the datamap, order of pages is same as `indexColumns` in
- * DataMapMeta returned in IndexDataMapFactory.
- *
- * Implementation should copy the content of `pages` as needed, because `pages` memory
- * may be freed after this method returns, if using unsafe column page.
- */
- override def onPageAdded(blockletId: Int,
- pageId: Int,
- pages: Array[ColumnPage]): Unit = {
- val size = pages(0).getPageSize
- val list = new ArrayBuffer[(Array[Byte], Int)]()
- var i = 0
- while (i < size) {
- val bytes = pages(0).getBytes(i)
- val newBytes = new Array[Byte](bytes.length - 2)
- System.arraycopy(bytes, 2, newBytes, 0, newBytes.length)
- list += ((newBytes, i))
- i = i + 1
- }
- // Sort based on the column data in order to create index.
- val sorted = list
- .sortWith((l, r) => ByteUtil.UnsafeComparer.INSTANCE.compareTo(l._1, r._1) <= 0)
- var oldValue: (Array[Byte], Seq[Int], Seq[Int]) = null
- var addedLast: Boolean = false
- // Merge all same column values to single row.
- sorted.foreach { f =>
- if (oldValue != null) {
- if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(f._1, oldValue._1) == 0) {
- oldValue = (oldValue._1, oldValue._2 ++ Seq(f._2), oldValue._3)
- addedLast = false
- } else {
- blockletList += oldValue
- oldValue = (f._1, Seq(f._2), Seq(pageId))
- addedLast = true
- }
- } else {
- oldValue = (f._1, Seq(f._2), Seq(pageId))
- addedLast = false
- }
- }
- if (!addedLast && oldValue != null) {
- blockletList += oldValue
- }
- }
-
-
- /**
- * This is called during closing of writer.So after this call no more data will be sent to this
- * class.
- */
- override def finish(): Unit = {
- val out = new ByteOutputStream()
- val outStream = new ObjectOutputStream(out)
- outStream.writeObject(maxMin)
- outStream.close()
- val bytes = compressor.compressByte(out.getBytes)
- stream.write(bytes)
- stream.writeInt(bytes.length)
- stream.close()
- commitFile(fgwritepath)
- }
-}
-
-class FGIndexDataMapTestCase extends QueryTest with BeforeAndAfterAll {
-
- val file2 = resourcesPath + "/compaction/fil2.csv"
-
- override protected def beforeAll(): Unit = {
- //n should be about 5000000 of reset if size is default 1024
- val n = 150000
- CompactionSupportGlobalSortBigFileTest.createFile(file2, n * 4, n)
- sql("DROP TABLE IF EXISTS normal_test")
- sql(
- """
- | CREATE TABLE normal_test(id INT, name STRING, city STRING, age INT)
- | STORED BY 'org.apache.carbondata.format'
- | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
- """.stripMargin)
- sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE normal_test OPTIONS('header'='false')")
- }
-
- test("test fg datamap") {
- sql("DROP TABLE IF EXISTS datamap_test")
- sql(
- """
- | CREATE TABLE datamap_test(id INT, name STRING, city STRING, age INT)
- | STORED BY 'org.apache.carbondata.format'
- | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
- """.stripMargin)
- val table = CarbonMetadata.getInstance().getCarbonTable("default_datamap_test")
- // register datamap writer
- sql(
- s"""
- | CREATE DATAMAP ggdatamap ON TABLE datamap_test
- | USING '${classOf[FGIndexDataMapFactory].getName}'
- | DMPROPERTIES('indexcolumns'='name')
- """.stripMargin)
- sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')")
- checkAnswer(sql("select * from datamap_test where name='n502670'"),
- sql("select * from normal_test where name='n502670'"))
- }
-
- test("test fg datamap with 2 datamaps ") {
- sql("DROP TABLE IF EXISTS datamap_test")
- sql(
- """
- | CREATE TABLE datamap_test(id INT, name STRING, city STRING, age INT)
- | STORED BY 'org.apache.carbondata.format'
- | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
- """.stripMargin)
- val table = CarbonMetadata.getInstance().getCarbonTable("default_datamap_test")
- // register datamap writer
- sql(
- s"""
- | CREATE DATAMAP ggdatamap1 ON TABLE datamap_test
- | USING '${classOf[FGIndexDataMapFactory].getName}'
- | DMPROPERTIES('indexcolumns'='name')
- """.stripMargin)
- sql(
- s"""
- | CREATE DATAMAP ggdatamap2 ON TABLE datamap_test
- | USING '${classOf[FGIndexDataMapFactory].getName}'
- | DMPROPERTIES('indexcolumns'='city')
- """.stripMargin)
- sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')")
- checkAnswer(sql("select * from datamap_test where name='n502670' and city='c2670'"),
- sql("select * from normal_test where name='n502670' and city='c2670'"))
- }
-
- override protected def afterAll(): Unit = {
- CompactionSupportGlobalSortBigFileTest.deleteFile(file2)
- sql("DROP TABLE IF EXISTS normal_test")
- sql("DROP TABLE IF EXISTS datamap_test")
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ef3031d0/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/IndexDataMapWriterSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/IndexDataMapWriterSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/IndexDataMapWriterSuite.scala
deleted file mode 100644
index 5fd8ae9..0000000
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/IndexDataMapWriterSuite.scala
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.testsuite.datamap
-
-import java.util
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.sql.test.util.QueryTest
-import org.apache.spark.sql.{DataFrame, SaveMode}
-import org.scalatest.BeforeAndAfterAll
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datamap.dev.AbstractDataMapWriter
-import org.apache.carbondata.core.datamap.dev.cgdatamap.{AbstractCoarseGrainIndexDataMap, AbstractCoarseGrainIndexDataMapFactory}
-import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta}
-import org.apache.carbondata.core.datastore.page.ColumnPage
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
-import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
-import org.apache.carbondata.core.scan.filter.intf.ExpressionType
-import org.apache.carbondata.core.metadata.datatype.DataTypes
-import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.events.Event
-
-class C2IndexDataMapFactory() extends AbstractCoarseGrainIndexDataMapFactory {
-
- var identifier: AbsoluteTableIdentifier = _
-
- override def init(identifier: AbsoluteTableIdentifier,
- dataMapSchema: DataMapSchema): Unit = {
- this.identifier = identifier
- }
-
- override def fireEvent(event: Event): Unit = ???
-
- override def clear(segmentId: String): Unit = {}
-
- override def clear(): Unit = {}
-
- override def getDataMaps(distributable: DataMapDistributable): java.util.List[AbstractCoarseGrainIndexDataMap] = ???
-
- override def getDataMaps(segmentId: String): util.List[AbstractCoarseGrainIndexDataMap] = ???
-
- override def createWriter(segmentId: String, dataWritePath: String): AbstractDataMapWriter =
- IndexDataMapWriterSuite.dataMapWriterC2Mock(identifier, segmentId, dataWritePath)
-
- override def getMeta: DataMapMeta = new DataMapMeta(List("c2").asJava, List(ExpressionType.EQUALS).asJava)
-
- /**
- * Get all distributable objects of a segmentid
- *
- * @return
- */
- override def toDistributable(segmentId: String): util.List[DataMapDistributable] = {
- ???
- }
-
-}
-
-class IndexDataMapWriterSuite extends QueryTest with BeforeAndAfterAll {
- def buildTestData(numRows: Int): DataFrame = {
- import sqlContext.implicits._
- sqlContext.sparkContext.parallelize(1 to numRows, 1)
- .map(x => ("a" + x, "b", x))
- .toDF("c1", "c2", "c3")
- }
-
- def dropTable(): Unit = {
- sql("DROP TABLE IF EXISTS carbon1")
- sql("DROP TABLE IF EXISTS carbon2")
- }
-
- override def beforeAll {
- dropTable()
- }
-
- test("test write datamap 2 pages") {
- sql(s"CREATE TABLE carbon1(c1 STRING, c2 STRING, c3 INT) STORED BY 'org.apache.carbondata.format'")
- // register datamap writer
- sql(s"CREATE DATAMAP test ON TABLE carbon1 USING '${classOf[C2IndexDataMapFactory].getName}'")
- val df = buildTestData(33000)
-
- // save dataframe to carbon file
- df.write
- .format("carbondata")
- .option("tableName", "carbon1")
- .option("tempCSV", "false")
- .option("sort_columns","c1")
- .mode(SaveMode.Overwrite)
- .save()
-
- assert(IndexDataMapWriterSuite.callbackSeq.head.contains("block start"))
- assert(IndexDataMapWriterSuite.callbackSeq.last.contains("block end"))
- assert(
- IndexDataMapWriterSuite.callbackSeq.slice(1, IndexDataMapWriterSuite.callbackSeq.length - 1) == Seq(
- "blocklet start 0",
- "add page data: blocklet 0, page 0",
- "add page data: blocklet 0, page 1",
- "blocklet end: 0"
- ))
- IndexDataMapWriterSuite.callbackSeq = Seq()
- }
-
- test("test write datamap 2 blocklet") {
- sql(s"CREATE TABLE carbon2(c1 STRING, c2 STRING, c3 INT) STORED BY 'org.apache.carbondata.format'")
- sql(s"CREATE DATAMAP test ON TABLE carbon2 USING '${classOf[C2IndexDataMapFactory].getName}'")
-
- CarbonProperties.getInstance()
- .addProperty("carbon.blockletgroup.size.in.mb", "1")
- CarbonProperties.getInstance()
- .addProperty("carbon.number.of.cores.while.loading",
- CarbonCommonConstants.NUM_CORES_DEFAULT_VAL)
-
- val df = buildTestData(300000)
-
- // save dataframe to carbon file
- df.write
- .format("carbondata")
- .option("tableName", "carbon2")
- .option("tempCSV", "false")
- .option("sort_columns","c1")
- .option("SORT_SCOPE","GLOBAL_SORT")
- .mode(SaveMode.Overwrite)
- .save()
-
- assert(IndexDataMapWriterSuite.callbackSeq.head.contains("block start"))
- assert(IndexDataMapWriterSuite.callbackSeq.last.contains("block end"))
- // corrected test case the min "carbon.blockletgroup.size.in.mb" size could not be less than
- // 64 MB
- assert(
- IndexDataMapWriterSuite.callbackSeq.slice(1, IndexDataMapWriterSuite.callbackSeq.length - 1) == Seq(
- "blocklet start 0",
- "add page data: blocklet 0, page 0",
- "add page data: blocklet 0, page 1",
- "add page data: blocklet 0, page 2",
- "add page data: blocklet 0, page 3",
- "add page data: blocklet 0, page 4",
- "add page data: blocklet 0, page 5",
- "add page data: blocklet 0, page 6",
- "add page data: blocklet 0, page 7",
- "add page data: blocklet 0, page 8",
- "add page data: blocklet 0, page 9",
- "blocklet end: 0"
- ))
- IndexDataMapWriterSuite.callbackSeq = Seq()
- }
-
- override def afterAll {
- dropTable()
- }
-}
-
-object IndexDataMapWriterSuite {
-
- var callbackSeq: Seq[String] = Seq[String]()
-
- def dataMapWriterC2Mock(identifier: AbsoluteTableIdentifier, segmentId: String,
- dataWritePath: String) =
- new AbstractDataMapWriter(identifier, segmentId, dataWritePath) {
-
- override def onPageAdded(
- blockletId: Int,
- pageId: Int,
- pages: Array[ColumnPage]): Unit = {
- assert(pages.length == 1)
- assert(pages(0).getDataType == DataTypes.STRING)
- val bytes: Array[Byte] = pages(0).getByteArrayPage()(0)
- assert(bytes.sameElements(Seq(0, 1, 'b'.toByte)))
- callbackSeq :+= s"add page data: blocklet $blockletId, page $pageId"
- }
-
- override def onBlockletEnd(blockletId: Int): Unit = {
- callbackSeq :+= s"blocklet end: $blockletId"
- }
-
- override def onBlockEnd(blockId: String): Unit = {
- callbackSeq :+= s"block end $blockId"
- }
-
- override def onBlockletStart(blockletId: Int): Unit = {
- callbackSeq :+= s"blocklet start $blockletId"
- }
-
- /**
- * Start of new block notification.
- *
- * @param blockId file name of the carbondata file
- */
- override def onBlockStart(blockId: String) = {
- callbackSeq :+= s"block start $blockId"
- }
-
- /**
- * This is called during closing of writer.So after this call no more data will be sent to this
- * class.
- */
- override def finish() = {
-
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ef3031d0/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
new file mode 100644
index 0000000..b020aa9
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.datamap
+
+import java.io.{File, FilenameFilter}
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.common.exceptions.MetadataProcessException
+import org.apache.carbondata.common.exceptions.sql.{MalformedDataMapCommandException, NoSuchDataMapException}
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+class TestDataMapCommand extends QueryTest with BeforeAndAfterAll {
+
+ val testData = s"$resourcesPath/sample.csv"
+
+ override def beforeAll {
+ sql("drop table if exists datamaptest")
+ sql("drop table if exists datamapshowtest")
+ sql("drop table if exists uniqdata")
+ sql("create table datamaptest (a string, b string, c string) stored by 'carbondata'")
+ }
+
+ val newClass = "org.apache.spark.sql.CarbonSource"
+
+ test("test datamap create: don't support using non-exist class") {
+ intercept[MetadataProcessException] {
+ sql(s"CREATE DATAMAP datamap1 ON TABLE datamaptest USING '$newClass'")
+ }
+ }
+
+ test("test datamap create with dmproperties: don't support using non-exist class") {
+ intercept[MetadataProcessException] {
+ sql(s"CREATE DATAMAP datamap2 ON TABLE datamaptest USING '$newClass' DMPROPERTIES('key'='value')")
+ }
+ }
+
+ test("test datamap create with existing name: don't support using non-exist class") {
+ intercept[MetadataProcessException] {
+ sql(
+ s"CREATE DATAMAP datamap2 ON TABLE datamaptest USING '$newClass' DMPROPERTIES('key'='value')")
+ }
+ }
+
+ test("test datamap create with preagg") {
+ sql("drop datamap if exists datamap3 on table datamaptest")
+ sql(
+ "create datamap datamap3 on table datamaptest using 'preaggregate' as select count(a) from datamaptest")
+ val table = CarbonMetadata.getInstance().getCarbonTable("default", "datamaptest")
+ assert(table != null)
+ val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList
+ assert(dataMapSchemaList.size() == 1)
+ assert(dataMapSchemaList.get(0).getDataMapName.equals("datamap3"))
+ assert(dataMapSchemaList.get(0).getChildSchema.getTableName.equals("datamaptest_datamap3"))
+ }
+
+ test("check hivemetastore after drop datamap") {
+ try {
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE,
+ "true")
+ sql("drop table if exists hiveMetaStoreTable")
+ sql("create table hiveMetaStoreTable (a string, b string, c string) stored by 'carbondata'")
+
+ sql(
+ "create datamap datamap_hiveMetaStoreTable on table hiveMetaStoreTable using 'preaggregate' as select count(a) from hiveMetaStoreTable")
+ checkExistence(sql("show datamap on table hiveMetaStoreTable"), true, "datamap_hiveMetaStoreTable")
+
+ sql("drop datamap datamap_hiveMetaStoreTable on table hiveMetaStoreTable")
+ checkExistence(sql("show datamap on table hiveMetaStoreTable"), false, "datamap_hiveMetaStoreTable")
+
+ } finally {
+ sql("drop table hiveMetaStoreTable")
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE,
+ CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT)
+ }
+ }
+
+ test("drop the table having pre-aggregate") {
+ try {
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE,
+ "true")
+ sql("drop table if exists hiveMetaStoreTable_1")
+ sql("create table hiveMetaStoreTable_1 (a string, b string, c string) stored by 'carbondata'")
+
+ sql(
+ "create datamap datamap_hiveMetaStoreTable_1 on table hiveMetaStoreTable_1 using 'preaggregate' as select count(a) from hiveMetaStoreTable_1")
+
+ checkExistence(sql("show datamap on table hiveMetaStoreTable_1"),
+ true,
+ "datamap_hiveMetaStoreTable_1")
+
+ sql("drop table hiveMetaStoreTable_1")
+
+ checkExistence(sql("show tables"), false, "datamap_hiveMetaStoreTable_1")
+ } finally {
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE,
+ CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT)
+ }
+ }
+
+ test("test datamap create with preagg with duplicate name") {
+ sql(
+ s"""
+ | CREATE DATAMAP datamap10 ON TABLE datamaptest
+ | USING 'preaggregate'
+ | AS SELECT COUNT(a) FROM datamaptest
+ """.stripMargin)
+ intercept[MalformedDataMapCommandException] {
+ sql(
+ s"""
+ | CREATE DATAMAP datamap10 ON TABLE datamaptest
+ | USING 'preaggregate'
+ | AS SELECT COUNT(a) FROM datamaptest
+ """.stripMargin)
+ }
+ val table = CarbonMetadata.getInstance().getCarbonTable("default", "datamaptest")
+ assert(table != null)
+ val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList
+ assert(dataMapSchemaList.size() == 2)
+ }
+
+ test("test drop non-exist datamap") {
+ intercept[NoSuchDataMapException] {
+ sql("drop datamap nonexist on table datamaptest")
+ }
+ val table = CarbonMetadata.getInstance().getCarbonTable("default", "datamaptest")
+ assert(table != null)
+ val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList
+ assert(dataMapSchemaList.size() == 2)
+ }
+
+ test("test show datamap without preaggregate: don't support using non-exist class") {
+ intercept[MetadataProcessException] {
+ sql("drop table if exists datamapshowtest")
+ sql("create table datamapshowtest (a string, b string, c string) stored by 'carbondata'")
+ sql(s"CREATE DATAMAP datamap1 ON TABLE datamapshowtest USING '$newClass' ")
+ sql(s"CREATE DATAMAP datamap2 ON TABLE datamapshowtest USING '$newClass' ")
+ checkExistence(sql("SHOW DATAMAP ON TABLE datamapshowtest"), true, "datamap1", "datamap2", "(NA)", newClass)
+ }
+ }
+
+ test("test show datamap with preaggregate: don't support using non-exist class") {
+ intercept[MetadataProcessException] {
+ sql("drop table if exists datamapshowtest")
+ sql("create table datamapshowtest (a string, b string, c string) stored by 'carbondata'")
+ sql("create datamap datamap1 on table datamapshowtest using 'preaggregate' as select count(a) from datamapshowtest")
+ sql(s"CREATE DATAMAP datamap2 ON TABLE datamapshowtest USING '$newClass' ")
+ val frame = sql("show datamap on table datamapshowtest")
+ assert(frame.collect().length == 2)
+ checkExistence(frame, true, "datamap1", "datamap2", "(NA)", newClass, "default.datamapshowtest_datamap1")
+ }
+ }
+
+ test("test show datamap with no datamap") {
+ sql("drop table if exists datamapshowtest")
+ sql("create table datamapshowtest (a string, b string, c string) stored by 'carbondata'")
+ assert(sql("show datamap on table datamapshowtest").collect().length == 0)
+ }
+
+ test("test show datamap after dropping datamap: don't support using non-exist class") {
+ intercept[MetadataProcessException] {
+ sql("drop table if exists datamapshowtest")
+ sql("create table datamapshowtest (a string, b string, c string) stored by 'carbondata'")
+ sql("create datamap datamap1 on table datamapshowtest using 'preaggregate' as select count(a) from datamapshowtest")
+ sql(s"CREATE DATAMAP datamap2 ON TABLE datamapshowtest USING '$newClass' ")
+ sql("drop datamap datamap1 on table datamapshowtest")
+ val frame = sql("show datamap on table datamapshowtest")
+ assert(frame.collect().length == 1)
+ checkExistence(frame, true, "datamap2", "(NA)", newClass)
+ }
+ }
+
+ test("test if preaggregate load is successfull for hivemetastore") {
+ try {
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE, "true")
+ sql("DROP TABLE IF EXISTS maintable")
+ sql(
+ """
+ | CREATE TABLE maintable(id int, name string, city string, age int)
+ | STORED BY 'org.apache.carbondata.format'
+ """.stripMargin)
+ sql(
+ s"""create datamap preagg_sum on table maintable using 'preaggregate' as select id,sum(age) from maintable group by id"""
+
+ .stripMargin)
+ sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+ checkAnswer(sql(s"select * from maintable_preagg_sum"),
+ Seq(Row(1, 31), Row(2, 27), Row(3, 70), Row(4, 55)))
+ } finally {
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE,
+ CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT)
+ }
+ }
+
+ test("test preaggregate load for decimal column for hivemetastore") {
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE, "true")
+ sql("CREATE TABLE uniqdata(CUST_ID int,CUST_NAME String,ACTIVE_EMUI_VERSION string,DOB timestamp,DOJ timestamp, BIGINT_COLUMN1 bigint,BIGINT_COLUMN2 bigint,DECIMAL_COLUMN1 decimal(30,10),DECIMAL_COLUMN2 decimal(36,10),Double_COLUMN1 double, Double_COLUMN2 double,INTEGER_COLUMN1 int) STORED BY 'org.apache.carbondata.format'")
+ sql("insert into uniqdata select 9000,'CUST_NAME_00000','ACTIVE_EMUI_VERSION_00000','1970-01-01 01:00:03','1970-01-01 02:00:03',123372036854,-223372036854,12345678901.1234000000,22345678901.1234000000,11234567489.7976000000,-11234567489.7976000000,1")
+ sql("create datamap uniqdata_agg on table uniqdata using 'preaggregate' as select min(DECIMAL_COLUMN1) from uniqdata group by DECIMAL_COLUMN1")
+ checkAnswer(sql("select * from uniqdata_uniqdata_agg"), Seq(Row(12345678901.1234000000, 12345678901.1234000000)))
+ sql("drop datamap if exists uniqdata_agg on table uniqdata")
+ }
+
+ test("create pre-agg table with path") {
+ sql("drop table if exists main_preagg")
+ sql("drop table if exists main ")
+ val warehouse = s"$metastoredb/warehouse"
+ val path = warehouse + "/" + System.nanoTime + "_preAggTestPath"
+ sql(
+ s"""
+ | create table main(
+ | year int,
+ | month int,
+ | name string,
+ | salary int)
+ | stored by 'carbondata'
+ | tblproperties('sort_columns'='month,year,name')
+ """.stripMargin)
+ sql("insert into main select 10,11,'amy',12")
+ sql("insert into main select 10,11,'amy',14")
+ sql(
+ s"""
+ | create datamap preagg
+ | on table main
+ | using 'preaggregate'
+ | dmproperties ('path'='$path')
+ | as select name,avg(salary)
+ | from main
+ | group by name
+ """.stripMargin)
+ assertResult(true)(new File(path).exists())
+ assertResult(true)(new File(s"${CarbonTablePath.getSegmentPath(path, "0")}")
+ .list(new FilenameFilter {
+ override def accept(dir: File, name: String): Boolean = {
+ name.contains(CarbonCommonConstants.FACT_FILE_EXT)
+ }
+ }).length > 0)
+ checkAnswer(sql("select name,avg(salary) from main group by name"), Row("amy", 13.0))
+ checkAnswer(sql("select * from main_preagg"), Row("amy", 26, 2))
+ sql("drop datamap preagg on table main")
+ assertResult(false)(new File(path).exists())
+ sql("drop table main")
+ }
+
+ override def afterAll {
+ sql("DROP TABLE IF EXISTS maintable")
+ sql("drop table if exists uniqdata")
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE,
+ CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT)
+ sql("drop table if exists datamaptest")
+ sql("drop table if exists datamapshowtest")
+ }
+}