You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/03/04 14:57:47 UTC
[05/20] carbondata git commit: [CARBONDATA-2189] Add DataMapProvider
developer interface
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8ded96e/docs/datamap-developer-guide.md
----------------------------------------------------------------------
diff --git a/docs/datamap-developer-guide.md b/docs/datamap-developer-guide.md
new file mode 100644
index 0000000..31afd34
--- /dev/null
+++ b/docs/datamap-developer-guide.md
@@ -0,0 +1,16 @@
+# DataMap Developer Guide
+
+### Introduction
+DataMap is a data structure that can be used to accelerate certain query of the table. Different DataMap can be implemented by developers.
+Currently, there are two 2 types of DataMap supported:
+1. IndexDataMap: DataMap that leveraging index to accelerate filter query
+2. MVDataMap: DataMap that leveraging Materialized View to accelerate olap style query, like SPJG query (select, predicate, join, groupby)
+
+### DataMap provider
+When user issues `CREATE DATAMAP dm ON TABLE main USING 'provider'`, the corresponding DataMapProvider implementation will be created and initialized.
+Currently, the provider string can be:
+1. preaggregate: one type of MVDataMap that do pre-aggregate of single table
+2. timeseries: one type of MVDataMap that do pre-aggregate based on time dimension of the table
+3. class name IndexDataMapFactory implementation: Developer can implement new type of IndexDataMap by extending IndexDataMapFactory
+
+When user issues `DROP DATAMAP dm ON TABLE main`, the corresponding DataMapProvider interface will be called.
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8ded96e/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 3bc4547..007ba2f 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -43,8 +43,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.exception.InvalidConfigurationException;
import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
import org.apache.carbondata.core.indexstore.PartitionSpec;
-import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMap;
-import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory;
+import org.apache.carbondata.core.indexstore.blockletindex.BlockletIndexDataMapFactory;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
import org.apache.carbondata.core.metadata.schema.PartitionInfo;
@@ -756,7 +755,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
DistributableDataMapFormat datamapDstr =
new DistributableDataMapFormat(absoluteTableIdentifier, dataMapExprWrapper,
segmentIds, partitionsToPrune,
- BlockletDataMapFactory.class.getName());
+ BlockletIndexDataMapFactory.class.getName());
prunedBlocklets = dataMapJob.execute(datamapDstr, resolver);
// Apply expression on the blocklets.
prunedBlocklets = dataMapExprWrapper.pruneBlocklets(prunedBlocklets);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8ded96e/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
index b1962c1..f208c92 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
@@ -286,7 +286,7 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll {
| GROUP BY dob,name
""".stripMargin)
}
- assert(e.getMessage.contains(s"$timeSeries keyword missing"))
+ assert(e.getMessage.contains("Only 'path' dmproperty is allowed for this datamap"))
sql("DROP TABLE IF EXISTS maintabletime")
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8ded96e/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
index 0f59949..6f78285 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
@@ -303,8 +303,8 @@ test("test PreAggregate table selection with timeseries and normal together") {
| GROUP BY dob,name
""".stripMargin)
- val df = sql("SELECT timeseries(dob,'year') FROM maintabletime GROUP BY timeseries(dob,'year')")
- preAggTableValidator(df.queryExecution.analyzed, "maintabletime_agg1_year")
+ val df = sql("SELECT timeseries(dob,'year') FROM maintabletime GROUP BY timeseries(dob,'year')")
+ preAggTableValidator(df.queryExecution.analyzed, "maintabletime_agg1_year")
sql("DROP TABLE IF EXISTS maintabletime")
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8ded96e/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala
index ec76b37..efe34c6 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala
@@ -201,7 +201,7 @@ class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll {
| GROUP BY dataTime
""".stripMargin)
}
- assert(e.getMessage.equals("DataMap class 'abc' not found"))
+ assert(e.getMessage.equals("DataMap 'abc' not found"))
}
test("test timeseries create table 12: USING and catch MalformedCarbonCommandException") {
@@ -216,7 +216,7 @@ class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll {
| GROUP BY dataTime
""".stripMargin)
}
- assert(e.getMessage.equals("DataMap class 'abc' not found"))
+ assert(e.getMessage.equals("DataMap 'abc' not found"))
}
test("test timeseries create table 13: Only one granularity level can be defined 1") {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8ded96e/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
deleted file mode 100644
index 041a63a..0000000
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
+++ /dev/null
@@ -1,381 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.spark.testsuite.datamap
-
-import java.io.{ByteArrayInputStream, DataOutputStream, ObjectInputStream, ObjectOutputStream}
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
-
-import com.sun.xml.internal.messaging.saaj.util.ByteOutputStream
-import org.apache.spark.sql.test.util.QueryTest
-import org.scalatest.BeforeAndAfterAll
-
-import org.apache.carbondata.core.datamap.dev.cgdatamap.{AbstractCoarseGrainDataMap, AbstractCoarseGrainDataMapFactory}
-import org.apache.carbondata.core.datamap.dev.{AbstractDataMapWriter, DataMapModel}
-import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager, Segment}
-import org.apache.carbondata.core.datastore.FileReader
-import org.apache.carbondata.core.datastore.block.SegmentProperties
-import org.apache.carbondata.core.datastore.compression.SnappyCompressor
-import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
-import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.datastore.page.ColumnPage
-import org.apache.carbondata.core.indexstore.{Blocklet, PartitionSpec}
-import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable
-import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
-import org.apache.carbondata.core.scan.expression.Expression
-import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression
-import org.apache.carbondata.core.scan.filter.intf.ExpressionType
-import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf
-import org.apache.carbondata.core.util.ByteUtil
-import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.events.Event
-import org.apache.carbondata.spark.testsuite.datacompaction.CompactionSupportGlobalSortBigFileTest
-
-class CGDataMapFactory extends AbstractCoarseGrainDataMapFactory {
- var identifier: AbsoluteTableIdentifier = _
- var dataMapSchema: DataMapSchema = _
-
- /**
- * Initialization of Datamap factory with the identifier and datamap name
- */
- override def init(identifier: AbsoluteTableIdentifier, dataMapSchema: DataMapSchema): Unit = {
- this.identifier = identifier
- this.dataMapSchema = dataMapSchema
- }
-
- /**
- * Return a new write for this datamap
- */
- override def createWriter(segment: Segment, dataWritePath: String): AbstractDataMapWriter = {
- new CGDataMapWriter(identifier, segment, dataWritePath, dataMapSchema)
- }
-
- /**
- * Get the datamap for segmentid
- */
- override def getDataMaps(segment: Segment): java.util.List[AbstractCoarseGrainDataMap] = {
- val file = FileFactory.getCarbonFile(
- CarbonTablePath.getSegmentPath(identifier.getTablePath, segment.getSegmentNo))
-
- val files = file.listFiles(new CarbonFileFilter {
- override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap")
- })
- files.map {f =>
- val dataMap: AbstractCoarseGrainDataMap = new CGDataMap()
- dataMap.init(new DataMapModel(f.getCanonicalPath))
- dataMap
- }.toList.asJava
- }
-
-
- /**
- * Get datamaps for distributable object.
- */
- override def getDataMaps(
- distributable: DataMapDistributable): java.util.List[AbstractCoarseGrainDataMap] = {
- val mapDistributable = distributable.asInstanceOf[BlockletDataMapDistributable]
- val dataMap: AbstractCoarseGrainDataMap = new CGDataMap()
- dataMap.init(new DataMapModel(mapDistributable.getFilePath))
- Seq(dataMap).asJava
- }
-
- /**
- *
- * @param event
- */
- override def fireEvent(event: Event): Unit = {
- ???
- }
-
- /**
- * Get all distributable objects of a segmentid
- *
- * @return
- */
- override def toDistributable(segment: Segment): java.util.List[DataMapDistributable] = {
- val file = FileFactory.getCarbonFile(
- CarbonTablePath.getSegmentPath(identifier.getTablePath, segment.getSegmentNo))
-
- val files = file.listFiles(new CarbonFileFilter {
- override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap")
- })
- files.map { f =>
- val d:DataMapDistributable = new BlockletDataMapDistributable(f.getCanonicalPath)
- d
- }.toList.asJava
- }
-
-
- /**
- * Clears datamap of the segment
- */
- override def clear(segment: Segment): Unit = {
-
- }
-
- /**
- * Clear all datamaps from memory
- */
- override def clear(): Unit = {
-
- }
-
- /**
- * Return metadata of this datamap
- */
- override def getMeta: DataMapMeta = {
- new DataMapMeta(dataMapSchema.getProperties.get("indexcolumns").split(",").toList.asJava,
- List(ExpressionType.EQUALS, ExpressionType.IN).asJava)
- }
-}
-
-class CGDataMap extends AbstractCoarseGrainDataMap {
-
- var maxMin: ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]))] = _
- var FileReader: FileReader = _
- var filePath: String = _
- val compressor = new SnappyCompressor
-
- /**
- * It is called to load the data map to memory or to initialize it.
- */
- override def init(dataMapModel: DataMapModel): Unit = {
- this.filePath = dataMapModel.getFilePath
- val size = FileFactory.getCarbonFile(filePath).getSize
- FileReader = FileFactory.getFileHolder(FileFactory.getFileType(filePath))
- val footerLen = FileReader.readInt(filePath, size-4)
- val bytes = FileReader.readByteArray(filePath, size-footerLen-4, footerLen)
- val in = new ByteArrayInputStream(compressor.unCompressByte(bytes))
- val obj = new ObjectInputStream(in)
- maxMin = obj.readObject().asInstanceOf[ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]))]]
- }
-
- /**
- * Prune the datamap with filter expression. It returns the list of
- * blocklets where these filters can exist.
- *
- * @param filterExp
- * @return
- */
- override def prune(
- filterExp: FilterResolverIntf,
- segmentProperties: SegmentProperties,
- partitions: java.util.List[PartitionSpec]): java.util.List[Blocklet] = {
- val buffer: ArrayBuffer[Expression] = new ArrayBuffer[Expression]()
- val expression = filterExp.getFilterExpression
- getEqualToExpression(expression, buffer)
- val value = buffer.map { f =>
- f.getChildren.get(1).evaluate(null).getString
- }
- val meta = findMeta(value(0).getBytes)
- meta.map { f=>
- new Blocklet(f._1, f._2 + "")
- }.asJava
- }
-
-
- private def findMeta(value: Array[Byte]) = {
- val tuples = maxMin.filter { f =>
- ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._3._1) <= 0 &&
- ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._3._2) >= 0
- }
- tuples
- }
-
- private def getEqualToExpression(expression: Expression, buffer: ArrayBuffer[Expression]): Unit = {
- if (expression.isInstanceOf[EqualToExpression]) {
- buffer += expression
- } else {
- if (expression.getChildren != null) {
- expression.getChildren.asScala.map { f =>
- if (f.isInstanceOf[EqualToExpression]) {
- buffer += f
- }
- getEqualToExpression(f, buffer)
- }
- }
- }
- }
-
- /**
- * Clear complete index table and release memory.
- */
- override def clear() = {
- ???
- }
-
- override def isScanRequired(filterExp: FilterResolverIntf): Boolean = ???
-}
-
-class CGDataMapWriter(identifier: AbsoluteTableIdentifier,
- segment: Segment,
- dataWritePath: String,
- dataMapSchema: DataMapSchema)
- extends AbstractDataMapWriter(identifier, segment, dataWritePath) {
-
- var currentBlockId: String = null
- val cgwritepath = dataWritePath + "/" +
- dataMapSchema.getDataMapName + System.nanoTime() + ".datamap"
- lazy val stream: DataOutputStream = FileFactory
- .getDataOutputStream(cgwritepath, FileFactory.getFileType(cgwritepath))
- val blockletList = new ArrayBuffer[Array[Byte]]()
- val maxMin = new ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]))]()
- val compressor = new SnappyCompressor
-
- /**
- * Start of new block notification.
- *
- * @param blockId file name of the carbondata file
- */
- override def onBlockStart(blockId: String): Unit = {
- currentBlockId = blockId
- }
-
- /**
- * End of block notification
- */
- override def onBlockEnd(blockId: String): Unit = {
-
- }
-
- /**
- * Start of new blocklet notification.
- *
- * @param blockletId sequence number of blocklet in the block
- */
- override def onBlockletStart(blockletId: Int): Unit = {
-
- }
-
- /**
- * End of blocklet notification
- *
- * @param blockletId sequence number of blocklet in the block
- */
- override def onBlockletEnd(blockletId: Int): Unit = {
- val sorted = blockletList
- .sortWith((l, r) => ByteUtil.UnsafeComparer.INSTANCE.compareTo(l, r) <= 0)
- maxMin +=
- ((currentBlockId+"", blockletId, (sorted.last, sorted.head)))
- blockletList.clear()
- }
-
- /**
- * Add the column pages row to the datamap, order of pages is same as `indexColumns` in
- * DataMapMeta returned in DataMapFactory.
- *
- * Implementation should copy the content of `pages` as needed, because `pages` memory
- * may be freed after this method returns, if using unsafe column page.
- */
- override def onPageAdded(blockletId: Int,
- pageId: Int,
- pages: Array[ColumnPage]): Unit = {
- val size = pages(0).getPageSize
- val list = new ArrayBuffer[Array[Byte]]()
- var i = 0
- while (i < size) {
- val bytes = pages(0).getBytes(i)
- val newBytes = new Array[Byte](bytes.length - 2)
- System.arraycopy(bytes, 2, newBytes, 0, newBytes.length)
- list += newBytes
- i = i + 1
- }
- // Sort based on the column data in order to create index.
- val sorted = list
- .sortWith((l, r) => ByteUtil.UnsafeComparer.INSTANCE.compareTo(l, r) <= 0)
- blockletList += sorted.head
- blockletList += sorted.last
- }
-
-
- /**
- * This is called during closing of writer.So after this call no more data will be sent to this
- * class.
- */
- override def finish(): Unit = {
- val out = new ByteOutputStream()
- val outStream = new ObjectOutputStream(out)
- outStream.writeObject(maxMin)
- outStream.close()
- val bytes = compressor.compressByte(out.getBytes)
- stream.write(bytes)
- stream.writeInt(bytes.length)
- stream.close()
- commitFile(cgwritepath)
- }
-
-
-}
-
-class CGDataMapTestCase extends QueryTest with BeforeAndAfterAll {
-
- val file2 = resourcesPath + "/compaction/fil2.csv"
- override protected def beforeAll(): Unit = {
- //n should be about 5000000 of reset if size is default 1024
- val n = 150000
- CompactionSupportGlobalSortBigFileTest.createFile(file2, n * 4, n)
- sql("DROP TABLE IF EXISTS normal_test")
- sql(
- """
- | CREATE TABLE normal_test(id INT, name STRING, city STRING, age INT)
- | STORED BY 'org.apache.carbondata.format'
- | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
- """.stripMargin)
- sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE normal_test OPTIONS('header'='false')")
- }
-
- test("test cg datamap") {
- sql("DROP TABLE IF EXISTS datamap_test_cg")
- sql(
- """
- | CREATE TABLE datamap_test_cg(id INT, name STRING, city STRING, age INT)
- | STORED BY 'org.apache.carbondata.format'
- | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
- """.stripMargin)
- val table = CarbonMetadata.getInstance().getCarbonTable("default_datamap_test_cg")
- // register datamap writer
- sql(s"create datamap cgdatamap on table datamap_test_cg using '${classOf[CGDataMapFactory].getName}' DMPROPERTIES('indexcolumns'='name')")
- sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test_cg OPTIONS('header'='false')")
- checkAnswer(sql("select * from datamap_test_cg where name='n502670'"),
- sql("select * from normal_test where name='n502670'"))
- }
-
- test("test cg datamap with 2 datamaps ") {
- sql("DROP TABLE IF EXISTS datamap_test")
- sql(
- """
- | CREATE TABLE datamap_test(id INT, name STRING, city STRING, age INT)
- | STORED BY 'org.apache.carbondata.format'
- | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
- """.stripMargin)
- val table = CarbonMetadata.getInstance().getCarbonTable("default_datamap_test")
- // register datamap writer
- sql(s"create datamap ggdatamap1 on table datamap_test using '${classOf[CGDataMapFactory].getName}' DMPROPERTIES('indexcolumns'='name')")
- sql(s"create datamap ggdatamap2 on table datamap_test using '${classOf[CGDataMapFactory].getName}' DMPROPERTIES('indexcolumns'='city')")
- sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')")
- checkAnswer(sql("select * from datamap_test where name='n502670' and city='c2670'"),
- sql("select * from normal_test where name='n502670' and city='c2670'"))
- }
-
- override protected def afterAll(): Unit = {
- CompactionSupportGlobalSortBigFileTest.deleteFile(file2)
- sql("DROP TABLE IF EXISTS normal_test")
- sql("DROP TABLE IF EXISTS datamap_test_cg")
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8ded96e/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGIndexDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGIndexDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGIndexDataMapTestCase.scala
new file mode 100644
index 0000000..72a88ea
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGIndexDataMapTestCase.scala
@@ -0,0 +1,383 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.testsuite.datamap
+
+import java.io.{ByteArrayInputStream, DataOutputStream, ObjectInputStream, ObjectOutputStream}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+import com.sun.xml.internal.messaging.saaj.util.ByteOutputStream
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.datamap.dev.{AbstractDataMapWriter, DataMapModel}
+import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager, Segment}
+import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager}
+import org.apache.carbondata.core.datamap.dev.{AbstractDataMapWriter, DataMapModel}
+import org.apache.carbondata.core.datamap.dev.cgdatamap.{AbstractCoarseGrainIndexDataMap, AbstractCoarseGrainIndexDataMapFactory}
+import org.apache.carbondata.core.datastore.FileReader
+import org.apache.carbondata.core.datastore.block.SegmentProperties
+import org.apache.carbondata.core.datastore.compression.SnappyCompressor
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.datastore.page.ColumnPage
+import org.apache.carbondata.core.indexstore.{Blocklet, PartitionSpec}
+import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
+import org.apache.carbondata.core.scan.expression.Expression
+import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression
+import org.apache.carbondata.core.scan.filter.intf.ExpressionType
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf
+import org.apache.carbondata.core.util.ByteUtil
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.events.Event
+import org.apache.carbondata.spark.testsuite.datacompaction.CompactionSupportGlobalSortBigFileTest
+
+class CGIndexDataMapFactory extends AbstractCoarseGrainIndexDataMapFactory {
+ var identifier: AbsoluteTableIdentifier = _
+ var dataMapSchema: DataMapSchema = _
+
+ /**
+ * Initialization of Datamap factory with the identifier and datamap name
+ */
+ override def init(identifier: AbsoluteTableIdentifier, dataMapSchema: DataMapSchema): Unit = {
+ this.identifier = identifier
+ this.dataMapSchema = dataMapSchema
+ }
+
+ /**
+ * Return a new write for this datamap
+ */
+ override def createWriter(segment: Segment, dataWritePath: String): AbstractDataMapWriter = {
+ new CGDataMapWriter(identifier, segment, dataWritePath, dataMapSchema)
+ }
+
+ /**
+ * Get the datamap for segmentid
+ */
+ override def getDataMaps(segment: Segment): java.util.List[AbstractCoarseGrainDataMap] = {
+ val file = FileFactory.getCarbonFile(
+ CarbonTablePath.getSegmentPath(identifier.getTablePath, segment.getSegmentNo))
+
+ val files = file.listFiles(new CarbonFileFilter {
+ override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap")
+ })
+ files.map {f =>
+ val dataMap: AbstractCoarseGrainIndexDataMap = new CGIndexDataMap()
+ dataMap.init(new DataMapModel(f.getCanonicalPath))
+ dataMap
+ }.toList.asJava
+ }
+
+
+ /**
+ * Get datamaps for distributable object.
+ */
+ override def getDataMaps(
+ distributable: DataMapDistributable): java.util.List[AbstractCoarseGrainIndexDataMap] = {
+ val mapDistributable = distributable.asInstanceOf[BlockletDataMapDistributable]
+ val dataMap: AbstractCoarseGrainIndexDataMap = new CGIndexDataMap()
+ dataMap.init(new DataMapModel(mapDistributable.getFilePath))
+ Seq(dataMap).asJava
+ }
+
+ /**
+ *
+ * @param event
+ */
+ override def fireEvent(event: Event): Unit = {
+ ???
+ }
+
+ /**
+ * Get all distributable objects of a segmentid
+ *
+ * @return
+ */
+ override def toDistributable(segment: Segment): java.util.List[DataMapDistributable] = {
+ val file = FileFactory.getCarbonFile(
+ CarbonTablePath.getSegmentPath(identifier.getTablePath, segment.getSegmentNo))
+
+ val files = file.listFiles(new CarbonFileFilter {
+ override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap")
+ })
+ files.map { f =>
+ val d:DataMapDistributable = new BlockletDataMapDistributable(f.getCanonicalPath)
+ d
+ }.toList.asJava
+ }
+
+
+ /**
+ * Clears datamap of the segment
+ */
+ override def clear(segment: Segment): Unit = {
+
+ }
+
+ /**
+ * Clear all datamaps from memory
+ */
+ override def clear(): Unit = {
+
+ }
+
+ /**
+ * Return metadata of this datamap
+ */
+ override def getMeta: DataMapMeta = {
+ new DataMapMeta(dataMapSchema.getProperties.get("indexcolumns").split(",").toList.asJava,
+ List(ExpressionType.EQUALS, ExpressionType.IN).asJava)
+ }
+}
+
+class CGIndexDataMap extends AbstractCoarseGrainIndexDataMap {
+
+ var maxMin: ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]))] = _
+ var FileReader: FileReader = _
+ var filePath: String = _
+ val compressor = new SnappyCompressor
+
+ /**
+ * It is called to load the data map to memory or to initialize it.
+ */
+ override def init(dataMapModel: DataMapModel): Unit = {
+ this.filePath = dataMapModel.getFilePath
+ val size = FileFactory.getCarbonFile(filePath).getSize
+ FileReader = FileFactory.getFileHolder(FileFactory.getFileType(filePath))
+ val footerLen = FileReader.readInt(filePath, size-4)
+ val bytes = FileReader.readByteArray(filePath, size-footerLen-4, footerLen)
+ val in = new ByteArrayInputStream(compressor.unCompressByte(bytes))
+ val obj = new ObjectInputStream(in)
+ maxMin = obj.readObject().asInstanceOf[ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]))]]
+ }
+
+ /**
+ * Prune the datamap with filter expression. It returns the list of
+ * blocklets where these filters can exist.
+ *
+ * @param filterExp
+ * @return
+ */
+ override def prune(
+ filterExp: FilterResolverIntf,
+ segmentProperties: SegmentProperties,
+ partitions: java.util.List[PartitionSpec]): java.util.List[Blocklet] = {
+ val buffer: ArrayBuffer[Expression] = new ArrayBuffer[Expression]()
+ val expression = filterExp.getFilterExpression
+ getEqualToExpression(expression, buffer)
+ val value = buffer.map { f =>
+ f.getChildren.get(1).evaluate(null).getString
+ }
+ val meta = findMeta(value(0).getBytes)
+ meta.map { f=>
+ new Blocklet(f._1, f._2 + "")
+ }.asJava
+ }
+
+
+ private def findMeta(value: Array[Byte]) = {
+ val tuples = maxMin.filter { f =>
+ ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._3._1) <= 0 &&
+ ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._3._2) >= 0
+ }
+ tuples
+ }
+
+ private def getEqualToExpression(expression: Expression, buffer: ArrayBuffer[Expression]): Unit = {
+ if (expression.isInstanceOf[EqualToExpression]) {
+ buffer += expression
+ } else {
+ if (expression.getChildren != null) {
+ expression.getChildren.asScala.map { f =>
+ if (f.isInstanceOf[EqualToExpression]) {
+ buffer += f
+ }
+ getEqualToExpression(f, buffer)
+ }
+ }
+ }
+ }
+
+ /**
+ * Clear complete index table and release memory.
+ */
+ override def clear() = {
+ ???
+ }
+
+ override def isScanRequired(filterExp: FilterResolverIntf): Boolean = ???
+}
+
+class CGDataMapWriter(identifier: AbsoluteTableIdentifier,
+ segment: Segment,
+ dataWritePath: String,
+ dataMapSchema: DataMapSchema)
+ extends AbstractDataMapWriter(identifier, segment, dataWritePath) {
+
+ var currentBlockId: String = null
+ val cgwritepath = dataWritePath + "/" +
+ dataMapSchema.getDataMapName + System.nanoTime() + ".datamap"
+ lazy val stream: DataOutputStream = FileFactory
+ .getDataOutputStream(cgwritepath, FileFactory.getFileType(cgwritepath))
+ val blockletList = new ArrayBuffer[Array[Byte]]()
+ val maxMin = new ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]))]()
+ val compressor = new SnappyCompressor
+
+ /**
+ * Start of new block notification.
+ *
+ * @param blockId file name of the carbondata file
+ */
+ override def onBlockStart(blockId: String): Unit = {
+ currentBlockId = blockId
+ }
+
+ /**
+ * End of block notification
+ */
+ override def onBlockEnd(blockId: String): Unit = {
+
+ }
+
+ /**
+ * Start of new blocklet notification.
+ *
+ * @param blockletId sequence number of blocklet in the block
+ */
+ override def onBlockletStart(blockletId: Int): Unit = {
+
+ }
+
+ /**
+ * End of blocklet notification
+ *
+ * @param blockletId sequence number of blocklet in the block
+ */
+ override def onBlockletEnd(blockletId: Int): Unit = {
+ val sorted = blockletList
+ .sortWith((l, r) => ByteUtil.UnsafeComparer.INSTANCE.compareTo(l, r) <= 0)
+ maxMin +=
+ ((currentBlockId+"", blockletId, (sorted.last, sorted.head)))
+ blockletList.clear()
+ }
+
+ /**
+ * Add the column pages row to the datamap, order of pages is same as `indexColumns` in
+ * DataMapMeta returned in IndexDataMapFactory.
+ *
+ * Implementation should copy the content of `pages` as needed, because `pages` memory
+ * may be freed after this method returns, if using unsafe column page.
+ */
+ override def onPageAdded(blockletId: Int,
+ pageId: Int,
+ pages: Array[ColumnPage]): Unit = {
+ val size = pages(0).getPageSize
+ val list = new ArrayBuffer[Array[Byte]]()
+ var i = 0
+ while (i < size) {
+ val bytes = pages(0).getBytes(i)
+ val newBytes = new Array[Byte](bytes.length - 2)
+ System.arraycopy(bytes, 2, newBytes, 0, newBytes.length)
+ list += newBytes
+ i = i + 1
+ }
+ // Sort based on the column data in order to create index.
+ val sorted = list
+ .sortWith((l, r) => ByteUtil.UnsafeComparer.INSTANCE.compareTo(l, r) <= 0)
+ blockletList += sorted.head
+ blockletList += sorted.last
+ }
+
+
+ /**
+ * This is called during closing of writer.So after this call no more data will be sent to this
+ * class.
+ */
+ override def finish(): Unit = {
+ val out = new ByteOutputStream()
+ val outStream = new ObjectOutputStream(out)
+ outStream.writeObject(maxMin)
+ outStream.close()
+ val bytes = compressor.compressByte(out.getBytes)
+ stream.write(bytes)
+ stream.writeInt(bytes.length)
+ stream.close()
+ commitFile(cgwritepath)
+ }
+
+
+}
+
+class CGIndexDataMapTestCase extends QueryTest with BeforeAndAfterAll {
+
+ val file2 = resourcesPath + "/compaction/fil2.csv"
+ override protected def beforeAll(): Unit = {
+ //n should be about 5000000 of reset if size is default 1024
+ val n = 150000
+ CompactionSupportGlobalSortBigFileTest.createFile(file2, n * 4, n)
+ sql("DROP TABLE IF EXISTS normal_test")
+ sql(
+ """
+ | CREATE TABLE normal_test(id INT, name STRING, city STRING, age INT)
+ | STORED BY 'org.apache.carbondata.format'
+ | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+ """.stripMargin)
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE normal_test OPTIONS('header'='false')")
+ }
+
+ test("test cg datamap") {
+ sql("DROP TABLE IF EXISTS datamap_test_cg")
+ sql(
+ """
+ | CREATE TABLE datamap_test_cg(id INT, name STRING, city STRING, age INT)
+ | STORED BY 'org.apache.carbondata.format'
+ | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+ """.stripMargin)
+ val table = CarbonMetadata.getInstance().getCarbonTable("default_datamap_test_cg")
+ // register datamap writer
+ sql(s"create datamap cgdatamap on table datamap_test_cg using '${classOf[CGIndexDataMapFactory].getName}' DMPROPERTIES('indexcolumns'='name')")
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test_cg OPTIONS('header'='false')")
+ checkAnswer(sql("select * from datamap_test_cg where name='n502670'"),
+ sql("select * from normal_test where name='n502670'"))
+ }
+
+ test("test cg datamap with 2 datamaps ") {
+ sql("DROP TABLE IF EXISTS datamap_test")
+ sql(
+ """
+ | CREATE TABLE datamap_test(id INT, name STRING, city STRING, age INT)
+ | STORED BY 'org.apache.carbondata.format'
+ | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+ """.stripMargin)
+ val table = CarbonMetadata.getInstance().getCarbonTable("default_datamap_test")
+ // register datamap writer
+ sql(s"create datamap ggdatamap1 on table datamap_test using '${classOf[CGIndexDataMapFactory].getName}' DMPROPERTIES('indexcolumns'='name')")
+ sql(s"create datamap ggdatamap2 on table datamap_test using '${classOf[CGIndexDataMapFactory].getName}' DMPROPERTIES('indexcolumns'='city')")
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')")
+ checkAnswer(sql("select * from datamap_test where name='n502670' and city='c2670'"),
+ sql("select * from normal_test where name='n502670' and city='c2670'"))
+ }
+
+ override protected def afterAll(): Unit = {
+ CompactionSupportGlobalSortBigFileTest.deleteFile(file2)
+ sql("DROP TABLE IF EXISTS normal_test")
+ sql("DROP TABLE IF EXISTS datamap_test_cg")
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8ded96e/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
deleted file mode 100644
index 00d13a9..0000000
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.testsuite.datamap
-
-import java.util
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.sql.test.util.QueryTest
-import org.apache.spark.sql.{DataFrame, SaveMode}
-import org.scalatest.BeforeAndAfterAll
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager, Segment}
-import org.apache.carbondata.core.datamap.dev.{AbstractDataMapWriter, DataMap}
-import org.apache.carbondata.core.datamap.dev.cgdatamap.{AbstractCoarseGrainDataMap, AbstractCoarseGrainDataMapFactory}
-import org.apache.carbondata.core.datastore.page.ColumnPage
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
-import org.apache.carbondata.core.metadata.datatype.DataTypes
-import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
-import org.apache.carbondata.core.scan.filter.intf.ExpressionType
-import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.events.Event
-
-class C2DataMapFactory() extends AbstractCoarseGrainDataMapFactory {
-
- var identifier: AbsoluteTableIdentifier = _
-
- override def init(identifier: AbsoluteTableIdentifier,
- dataMapSchema: DataMapSchema): Unit = {
- this.identifier = identifier
- }
-
- override def fireEvent(event: Event): Unit = ???
-
- override def clear(segment: Segment): Unit = {}
-
- override def clear(): Unit = {}
-
- override def getDataMaps(distributable: DataMapDistributable): util.List[AbstractCoarseGrainDataMap] = ???
-
- override def getDataMaps(segment: Segment): util.List[AbstractCoarseGrainDataMap] = ???
-
- override def createWriter(segment: Segment, dataWritePath: String): AbstractDataMapWriter =
- DataMapWriterSuite.dataMapWriterC2Mock(identifier, segment, dataWritePath)
-
- override def getMeta: DataMapMeta = new DataMapMeta(List("c2").asJava, List(ExpressionType.EQUALS).asJava)
-
- /**
- * Get all distributable objects of a segmentid
- *
- * @return
- */
- override def toDistributable(segmentId: Segment): util.List[DataMapDistributable] = {
- ???
- }
-
-}
-
-class DataMapWriterSuite extends QueryTest with BeforeAndAfterAll {
- def buildTestData(numRows: Int): DataFrame = {
- import sqlContext.implicits._
- sqlContext.sparkContext.parallelize(1 to numRows, 1)
- .map(x => ("a" + x, "b", x))
- .toDF("c1", "c2", "c3")
- }
-
- def dropTable(): Unit = {
- sql("DROP TABLE IF EXISTS carbon1")
- sql("DROP TABLE IF EXISTS carbon2")
- }
-
- override def beforeAll {
- dropTable()
- }
-
- test("test write datamap 2 pages") {
- sql(s"CREATE TABLE carbon1(c1 STRING, c2 STRING, c3 INT) STORED BY 'org.apache.carbondata.format'")
- // register datamap writer
- sql(s"CREATE DATAMAP test ON TABLE carbon1 USING '${classOf[C2DataMapFactory].getName}'")
- val df = buildTestData(33000)
-
- // save dataframe to carbon file
- df.write
- .format("carbondata")
- .option("tableName", "carbon1")
- .option("tempCSV", "false")
- .option("sort_columns","c1")
- .mode(SaveMode.Overwrite)
- .save()
-
- assert(DataMapWriterSuite.callbackSeq.head.contains("block start"))
- assert(DataMapWriterSuite.callbackSeq.last.contains("block end"))
- assert(
- DataMapWriterSuite.callbackSeq.slice(1, DataMapWriterSuite.callbackSeq.length - 1) == Seq(
- "blocklet start 0",
- "add page data: blocklet 0, page 0",
- "add page data: blocklet 0, page 1",
- "blocklet end: 0"
- ))
- DataMapWriterSuite.callbackSeq = Seq()
- }
-
- test("test write datamap 2 blocklet") {
- sql(s"CREATE TABLE carbon2(c1 STRING, c2 STRING, c3 INT) STORED BY 'org.apache.carbondata.format'")
- sql(s"CREATE DATAMAP test ON TABLE carbon2 USING '${classOf[C2DataMapFactory].getName}'")
-
- CarbonProperties.getInstance()
- .addProperty("carbon.blockletgroup.size.in.mb", "1")
- CarbonProperties.getInstance()
- .addProperty("carbon.number.of.cores.while.loading",
- CarbonCommonConstants.NUM_CORES_DEFAULT_VAL)
-
- val df = buildTestData(300000)
-
- // save dataframe to carbon file
- df.write
- .format("carbondata")
- .option("tableName", "carbon2")
- .option("tempCSV", "false")
- .option("sort_columns","c1")
- .option("SORT_SCOPE","GLOBAL_SORT")
- .mode(SaveMode.Overwrite)
- .save()
-
- assert(DataMapWriterSuite.callbackSeq.head.contains("block start"))
- assert(DataMapWriterSuite.callbackSeq.last.contains("block end"))
- // corrected test case the min "carbon.blockletgroup.size.in.mb" size could not be less than
- // 64 MB
- assert(
- DataMapWriterSuite.callbackSeq.slice(1, DataMapWriterSuite.callbackSeq.length - 1) == Seq(
- "blocklet start 0",
- "add page data: blocklet 0, page 0",
- "add page data: blocklet 0, page 1",
- "add page data: blocklet 0, page 2",
- "add page data: blocklet 0, page 3",
- "add page data: blocklet 0, page 4",
- "add page data: blocklet 0, page 5",
- "add page data: blocklet 0, page 6",
- "add page data: blocklet 0, page 7",
- "add page data: blocklet 0, page 8",
- "add page data: blocklet 0, page 9",
- "blocklet end: 0"
- ))
- DataMapWriterSuite.callbackSeq = Seq()
- }
-
- override def afterAll {
- dropTable()
- }
-}
-
-object DataMapWriterSuite {
-
- var callbackSeq: Seq[String] = Seq[String]()
-
- def dataMapWriterC2Mock(identifier: AbsoluteTableIdentifier, segment: Segment,
- dataWritePath: String) =
- new AbstractDataMapWriter(identifier, segment, dataWritePath) {
-
- override def onPageAdded(
- blockletId: Int,
- pageId: Int,
- pages: Array[ColumnPage]): Unit = {
- assert(pages.length == 1)
- assert(pages(0).getDataType == DataTypes.STRING)
- val bytes: Array[Byte] = pages(0).getByteArrayPage()(0)
- assert(bytes.sameElements(Seq(0, 1, 'b'.toByte)))
- callbackSeq :+= s"add page data: blocklet $blockletId, page $pageId"
- }
-
- override def onBlockletEnd(blockletId: Int): Unit = {
- callbackSeq :+= s"blocklet end: $blockletId"
- }
-
- override def onBlockEnd(blockId: String): Unit = {
- callbackSeq :+= s"block end $blockId"
- }
-
- override def onBlockletStart(blockletId: Int): Unit = {
- callbackSeq :+= s"blocklet start $blockletId"
- }
-
- /**
- * Start of new block notification.
- *
- * @param blockId file name of the carbondata file
- */
- override def onBlockStart(blockId: String) = {
- callbackSeq :+= s"block start $blockId"
- }
-
- /**
- * This is called during closing of writer.So after this call no more data will be sent to this
- * class.
- */
- override def finish() = {
-
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8ded96e/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
deleted file mode 100644
index 90f0972..0000000
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
+++ /dev/null
@@ -1,473 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.spark.testsuite.datamap
-
-import java.io.{ByteArrayInputStream, DataOutputStream, ObjectInputStream, ObjectOutputStream}
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
-
-import com.sun.xml.internal.messaging.saaj.util.ByteOutputStream
-import org.apache.spark.sql.test.util.QueryTest
-import org.scalatest.BeforeAndAfterAll
-
-import org.apache.carbondata.core.datamap.dev.fgdatamap.{AbstractFineGrainDataMap, AbstractFineGrainDataMapFactory}
-import org.apache.carbondata.core.datamap.dev.{AbstractDataMapWriter, DataMapModel}
-import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager, Segment}
-import org.apache.carbondata.core.datastore.FileReader
-import org.apache.carbondata.core.datastore.block.SegmentProperties
-import org.apache.carbondata.core.datastore.compression.SnappyCompressor
-import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
-import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.datastore.page.ColumnPage
-import org.apache.carbondata.core.indexstore.{Blocklet, FineGrainBlocklet, PartitionSpec}
-import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable
-import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
-import org.apache.carbondata.core.scan.expression.Expression
-import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression
-import org.apache.carbondata.core.scan.filter.intf.ExpressionType
-import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf
-import org.apache.carbondata.core.util.ByteUtil
-import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.events.Event
-import org.apache.carbondata.spark.testsuite.datacompaction.CompactionSupportGlobalSortBigFileTest
-
-class FGDataMapFactory extends AbstractFineGrainDataMapFactory {
- var identifier: AbsoluteTableIdentifier = _
- var dataMapSchema: DataMapSchema = _
-
- /**
- * Initialization of Datamap factory with the identifier and datamap name
- */
- override def init(identifier: AbsoluteTableIdentifier, dataMapSchema: DataMapSchema): Unit = {
- this.identifier = identifier
- this.dataMapSchema = dataMapSchema
- }
-
- /**
- * Return a new write for this datamap
- */
- override def createWriter(segment: Segment, dataWritePath: String): AbstractDataMapWriter = {
- new FGDataMapWriter(identifier, segment, dataWritePath, dataMapSchema)
- }
-
- /**
- * Get the datamap for segmentid
- */
- override def getDataMaps(segment: Segment): java.util.List[AbstractFineGrainDataMap] = {
- val file = FileFactory
- .getCarbonFile(CarbonTablePath.getSegmentPath(identifier.getTablePath, segment.getSegmentNo))
-
- val files = file.listFiles(new CarbonFileFilter {
- override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap")
- })
- files.map { f =>
- val dataMap: AbstractFineGrainDataMap = new FGDataMap()
- dataMap.init(new DataMapModel(f.getCanonicalPath))
- dataMap
- }.toList.asJava
- }
-
- /**
- * Get datamap for distributable object.
- */
- override def getDataMaps(
- distributable: DataMapDistributable): java.util.List[AbstractFineGrainDataMap]= {
- val mapDistributable = distributable.asInstanceOf[BlockletDataMapDistributable]
- val dataMap: AbstractFineGrainDataMap = new FGDataMap()
- dataMap.init(new DataMapModel(mapDistributable.getFilePath))
- Seq(dataMap).asJava
- }
-
- /**
- * Get all distributable objects of a segmentid
- *
- * @return
- */
- override def toDistributable(segment: Segment): java.util.List[DataMapDistributable] = {
- val file = FileFactory.getCarbonFile(
- CarbonTablePath.getSegmentPath(identifier.getTablePath, segment.getSegmentNo))
-
- val files = file.listFiles(new CarbonFileFilter {
- override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap")
- })
- files.map { f =>
- val d: DataMapDistributable = new BlockletDataMapDistributable(f.getCanonicalPath)
- d
- }.toList.asJava
- }
-
- /**
- *
- * @param event
- */
- override def fireEvent(event: Event):Unit = {
- ???
- }
-
- /**
- * Clears datamap of the segment
- */
- override def clear(segment: Segment): Unit = {
- }
-
- /**
- * Clear all datamaps from memory
- */
- override def clear(): Unit = {
- }
-
- /**
- * Return metadata of this datamap
- */
- override def getMeta: DataMapMeta = {
- new DataMapMeta(dataMapSchema.getProperties.get("indexcolumns").split(",").toList.asJava,
- List(ExpressionType.EQUALS, ExpressionType.IN).asJava)
- }
-}
-
-class FGDataMap extends AbstractFineGrainDataMap {
-
- var maxMin: ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]), Long, Int)] = _
- var FileReader: FileReader = _
- var filePath: String = _
- val compressor = new SnappyCompressor
-
- /**
- * It is called to load the data map to memory or to initialize it.
- */
- override def init(dataMapModel: DataMapModel): Unit = {
- this.filePath = dataMapModel.getFilePath
- val size = FileFactory.getCarbonFile(filePath).getSize
- FileReader = FileFactory.getFileHolder(FileFactory.getFileType(filePath))
- val footerLen = FileReader.readInt(filePath, size - 4)
- val bytes = FileReader.readByteArray(filePath, size - footerLen - 4, footerLen)
- val in = new ByteArrayInputStream(compressor.unCompressByte(bytes))
- val obj = new ObjectInputStream(in)
- maxMin = obj.readObject()
- .asInstanceOf[ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]), Long, Int)]]
- }
-
- /**
- * Prune the datamap with filter expression. It returns the list of
- * blocklets where these filters can exist.
- *
- * @param filterExp
- * @return
- */
- override def prune(
- filterExp: FilterResolverIntf,
- segmentProperties: SegmentProperties,
- partitions: java.util.List[PartitionSpec]): java.util.List[Blocklet] = {
- val buffer: ArrayBuffer[Expression] = new ArrayBuffer[Expression]()
- val expression = filterExp.getFilterExpression
- getEqualToExpression(expression, buffer)
- val value = buffer.map { f =>
- f.getChildren.get(1).evaluate(null).getString
- }
- val meta = findMeta(value(0).getBytes)
- meta.map { f =>
- readAndFindData(f, value(0).getBytes())
- }.filter(_.isDefined).map(_.get).asJava
- }
-
- private def readAndFindData(meta: (String, Int, (Array[Byte], Array[Byte]), Long, Int),
- value: Array[Byte]): Option[Blocklet] = {
- val bytes = FileReader.readByteArray(filePath, meta._4, meta._5)
- val outputStream = new ByteArrayInputStream(compressor.unCompressByte(bytes))
- val obj = new ObjectInputStream(outputStream)
- val blockletsData = obj.readObject()
- .asInstanceOf[ArrayBuffer[(Array[Byte], Seq[Seq[Int]], Seq[Int])]]
-
- import scala.collection.Searching._
- val searching = blockletsData
- .search[(Array[Byte], Seq[Seq[Int]], Seq[Int])]((value, Seq(Seq(0)), Seq(0)))(new Ordering[
- (Array[Byte], Seq[Seq[Int]], Seq[Int])] {
- override def compare(x: (Array[Byte], Seq[Seq[Int]], Seq[Int]),
- y: (Array[Byte], Seq[Seq[Int]], Seq[Int])) = {
- ByteUtil.UnsafeComparer.INSTANCE.compareTo(x._1, y._1)
- }
- })
- if (searching.insertionPoint >= 0) {
- val f = blockletsData(searching.insertionPoint)
- val pages = f._3.zipWithIndex.map { p =>
- val pg = new FineGrainBlocklet.Page
- pg.setPageId(p._1)
- pg.setRowId(f._2(p._2).toArray)
- pg
- }
- Some(new FineGrainBlocklet(meta._1, meta._2.toString, pages.toList.asJava))
- } else {
- None
- }
- }
-
- private def findMeta(value: Array[Byte]) = {
- val tuples = maxMin.filter { f =>
- ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._3._1) >= 0 &&
- ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._3._2) <= 0
- }
- tuples
- }
-
- def getEqualToExpression(expression: Expression, buffer: ArrayBuffer[Expression]): Unit = {
- if (expression.isInstanceOf[EqualToExpression]) {
- buffer += expression
- } else {
- if (expression.getChildren != null) {
- expression.getChildren.asScala.map { f =>
- if (f.isInstanceOf[EqualToExpression]) {
- buffer += f
- }
- getEqualToExpression(f, buffer)
- }
- }
- }
- }
-
- /**
- * Clear complete index table and release memory.
- */
- override def clear():Unit = {
- ???
- }
-
- override def isScanRequired(filterExp: FilterResolverIntf): Boolean = ???
-}
-
-class FGDataMapWriter(identifier: AbsoluteTableIdentifier,
- segment: Segment, dataWriterPath: String, dataMapSchema: DataMapSchema)
- extends AbstractDataMapWriter(identifier, segment, dataWriterPath) {
-
- var currentBlockId: String = null
- val fgwritepath = dataWriterPath + "/" + dataMapSchema.getDataMapName + System.nanoTime() +
- ".datamap"
- val stream: DataOutputStream = FileFactory
- .getDataOutputStream(fgwritepath, FileFactory.getFileType(fgwritepath))
- val blockletList = new ArrayBuffer[(Array[Byte], Seq[Int], Seq[Int])]()
- val maxMin = new ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]), Long, Int)]()
- var position: Long = 0
- val compressor = new SnappyCompressor
-
- /**
- * Start of new block notification.
- *
- * @param blockId file name of the carbondata file
- */
- override def onBlockStart(blockId: String): Unit = {
- currentBlockId = blockId
- }
-
- /**
- * End of block notification
- */
- override def onBlockEnd(blockId: String): Unit = {
-
- }
-
- /**
- * Start of new blocklet notification.
- *
- * @param blockletId sequence number of blocklet in the block
- */
- override def onBlockletStart(blockletId: Int): Unit = {
-
- }
-
- /**
- * End of blocklet notification
- *
- * @param blockletId sequence number of blocklet in the block
- */
- override def onBlockletEnd(blockletId: Int): Unit = {
- val sorted = blockletList
- .sortWith((l, r) => ByteUtil.UnsafeComparer.INSTANCE.compareTo(l._1, r._1) <= 0)
- var oldValue: (Array[Byte], Seq[Seq[Int]], Seq[Int]) = null
- var addedLast: Boolean = false
- val blockletListUpdated = new ArrayBuffer[(Array[Byte], Seq[Seq[Int]], Seq[Int])]()
- // Merge all same column values to single row.
- sorted.foreach { f =>
- if (oldValue != null) {
- if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(f._1, oldValue._1) == 0) {
- oldValue = (oldValue._1, oldValue._2 ++ Seq(f._2), oldValue._3 ++ f._3)
- addedLast = false
- } else {
- blockletListUpdated += oldValue
- oldValue = (f._1, Seq(f._2), f._3)
- addedLast = true
- }
- } else {
- oldValue = (f._1, Seq(f._2), f._3)
- addedLast = false
- }
- }
- if (!addedLast && oldValue != null) {
- blockletListUpdated += oldValue
- }
-
- val out = new ByteOutputStream()
- val outStream = new ObjectOutputStream(out)
- outStream.writeObject(blockletListUpdated)
- outStream.close()
- val bytes = compressor.compressByte(out.getBytes)
- stream.write(bytes)
- maxMin +=
- ((currentBlockId + "", blockletId, (blockletListUpdated.head._1, blockletListUpdated.last
- ._1), position, bytes.length))
- position += bytes.length
- blockletList.clear()
- }
-
- /**
- * Add the column pages row to the datamap, order of pages is same as `indexColumns` in
- * DataMapMeta returned in DataMapFactory.
- *
- * Implementation should copy the content of `pages` as needed, because `pages` memory
- * may be freed after this method returns, if using unsafe column page.
- */
- override def onPageAdded(blockletId: Int,
- pageId: Int,
- pages: Array[ColumnPage]): Unit = {
- val size = pages(0).getPageSize
- val list = new ArrayBuffer[(Array[Byte], Int)]()
- var i = 0
- while (i < size) {
- val bytes = pages(0).getBytes(i)
- val newBytes = new Array[Byte](bytes.length - 2)
- System.arraycopy(bytes, 2, newBytes, 0, newBytes.length)
- list += ((newBytes, i))
- i = i + 1
- }
- // Sort based on the column data in order to create index.
- val sorted = list
- .sortWith((l, r) => ByteUtil.UnsafeComparer.INSTANCE.compareTo(l._1, r._1) <= 0)
- var oldValue: (Array[Byte], Seq[Int], Seq[Int]) = null
- var addedLast: Boolean = false
- // Merge all same column values to single row.
- sorted.foreach { f =>
- if (oldValue != null) {
- if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(f._1, oldValue._1) == 0) {
- oldValue = (oldValue._1, oldValue._2 ++ Seq(f._2), oldValue._3)
- addedLast = false
- } else {
- blockletList += oldValue
- oldValue = (f._1, Seq(f._2), Seq(pageId))
- addedLast = true
- }
- } else {
- oldValue = (f._1, Seq(f._2), Seq(pageId))
- addedLast = false
- }
- }
- if (!addedLast && oldValue != null) {
- blockletList += oldValue
- }
- }
-
-
- /**
- * This is called during closing of writer.So after this call no more data will be sent to this
- * class.
- */
- override def finish(): Unit = {
- val out = new ByteOutputStream()
- val outStream = new ObjectOutputStream(out)
- outStream.writeObject(maxMin)
- outStream.close()
- val bytes = compressor.compressByte(out.getBytes)
- stream.write(bytes)
- stream.writeInt(bytes.length)
- stream.close()
- commitFile(fgwritepath)
- }
-
-
-}
-
-class FGDataMapTestCase extends QueryTest with BeforeAndAfterAll {
-
- val file2 = resourcesPath + "/compaction/fil2.csv"
-
- override protected def beforeAll(): Unit = {
- //n should be about 5000000 of reset if size is default 1024
- val n = 150000
- CompactionSupportGlobalSortBigFileTest.createFile(file2, n * 4, n)
- sql("DROP TABLE IF EXISTS normal_test")
- sql(
- """
- | CREATE TABLE normal_test(id INT, name STRING, city STRING, age INT)
- | STORED BY 'org.apache.carbondata.format'
- | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
- """.stripMargin)
- sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE normal_test OPTIONS('header'='false')")
- }
-
- test("test fg datamap") {
- sql("DROP TABLE IF EXISTS datamap_test")
- sql(
- """
- | CREATE TABLE datamap_test(id INT, name STRING, city STRING, age INT)
- | STORED BY 'org.apache.carbondata.format'
- | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
- """.stripMargin)
- val table = CarbonMetadata.getInstance().getCarbonTable("default_datamap_test")
- // register datamap writer
- sql(
- s"""
- | CREATE DATAMAP ggdatamap ON TABLE datamap_test
- | USING '${classOf[FGDataMapFactory].getName}'
- | DMPROPERTIES('indexcolumns'='name')
- """.stripMargin)
- sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')")
- checkAnswer(sql("select * from datamap_test where name='n502670'"),
- sql("select * from normal_test where name='n502670'"))
- }
-
- test("test fg datamap with 2 datamaps ") {
- sql("DROP TABLE IF EXISTS datamap_test")
- sql(
- """
- | CREATE TABLE datamap_test(id INT, name STRING, city STRING, age INT)
- | STORED BY 'org.apache.carbondata.format'
- | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
- """.stripMargin)
- val table = CarbonMetadata.getInstance().getCarbonTable("default_datamap_test")
- // register datamap writer
- sql(
- s"""
- | CREATE DATAMAP ggdatamap1 ON TABLE datamap_test
- | USING '${classOf[FGDataMapFactory].getName}'
- | DMPROPERTIES('indexcolumns'='name')
- """.stripMargin)
- sql(
- s"""
- | CREATE DATAMAP ggdatamap2 ON TABLE datamap_test
- | USING '${classOf[FGDataMapFactory].getName}'
- | DMPROPERTIES('indexcolumns'='city')
- """.stripMargin)
- sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')")
- checkAnswer(sql("select * from datamap_test where name='n502670' and city='c2670'"),
- sql("select * from normal_test where name='n502670' and city='c2670'"))
- }
-
- override protected def afterAll(): Unit = {
- CompactionSupportGlobalSortBigFileTest.deleteFile(file2)
- sql("DROP TABLE IF EXISTS normal_test")
- sql("DROP TABLE IF EXISTS datamap_test")
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8ded96e/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGIndexDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGIndexDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGIndexDataMapTestCase.scala
new file mode 100644
index 0000000..d92efe7
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGIndexDataMapTestCase.scala
@@ -0,0 +1,472 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.testsuite.datamap
+
+import java.io.{ByteArrayInputStream, DataOutputStream, ObjectInputStream, ObjectOutputStream}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+import com.sun.xml.internal.messaging.saaj.util.ByteOutputStream
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.datamap.dev.fgdatamap.{AbstractFineGrainIndexDataMap, AbstractFineGrainIndexDataMapFactory}
+import org.apache.carbondata.core.datamap.dev.{AbstractDataMapWriter, DataMapModel}
+import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager, Segment}
+import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta}
+import org.apache.carbondata.core.datastore.FileReader
+import org.apache.carbondata.core.datastore.block.SegmentProperties
+import org.apache.carbondata.core.datastore.compression.SnappyCompressor
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.datastore.page.ColumnPage
+import org.apache.carbondata.core.indexstore.{Blocklet, FineGrainBlocklet, PartitionSpec}
+import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
+import org.apache.carbondata.core.scan.expression.Expression
+import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression
+import org.apache.carbondata.core.scan.filter.intf.ExpressionType
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf
+import org.apache.carbondata.core.util.ByteUtil
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.events.Event
+import org.apache.carbondata.spark.testsuite.datacompaction.CompactionSupportGlobalSortBigFileTest
+
+class FGIndexDataMapFactory extends AbstractFineGrainIndexDataMapFactory {
+ var identifier: AbsoluteTableIdentifier = _
+ var dataMapSchema: DataMapSchema = _
+
+ /**
+ * Initialization of Datamap factory with the identifier and datamap name
+ */
+ override def init(identifier: AbsoluteTableIdentifier, dataMapSchema: DataMapSchema): Unit = {
+ this.identifier = identifier
+ this.dataMapSchema = dataMapSchema
+ }
+
+ /**
+ * Return a new write for this datamap
+ */
+ override def createWriter(segment: Segment, dataWritePath: String): AbstractDataMapWriter = {
+ new FGDataMapWriter(identifier, segment, dataWritePath, dataMapSchema)
+ }
+
+ /**
+ * Get the datamap for segmentid
+ */
+ override def getDataMaps(segment: Segment): java.util.List[AbstractFineGrainIndexDataMap] = {
+ val file = FileFactory
+ .getCarbonFile(CarbonTablePath.getSegmentPath(identifier.getTablePath, segment.getSegmentNo))
+
+ val files = file.listFiles(new CarbonFileFilter {
+ override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap")
+ })
+ files.map { f =>
+ val dataMap: AbstractFineGrainIndexDataMap = new FGIndexDataMap()
+ dataMap.init(new DataMapModel(f.getCanonicalPath))
+ dataMap
+ }.toList.asJava
+ }
+
+ /**
+ * Get datamap for distributable object.
+ */
+ override def getDataMaps(
+ distributable: DataMapDistributable): java.util.List[AbstractFineGrainIndexDataMap]= {
+ val mapDistributable = distributable.asInstanceOf[BlockletDataMapDistributable]
+ val dataMap: AbstractFineGrainIndexDataMap = new FGIndexDataMap()
+ dataMap.init(new DataMapModel(mapDistributable.getFilePath))
+ Seq(dataMap).asJava
+ }
+
+ /**
+ * Get all distributable objects of a segmentid
+ *
+ * @return
+ */
+ override def toDistributable(segment: Segment): java.util.List[DataMapDistributable] = {
+ val file = FileFactory.getCarbonFile(
+ CarbonTablePath.getSegmentPath(identifier.getTablePath, segment.getSegmentNo))
+
+ val files = file.listFiles(new CarbonFileFilter {
+ override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap")
+ })
+ files.map { f =>
+ val d: DataMapDistributable = new BlockletDataMapDistributable(f.getCanonicalPath)
+ d
+ }.toList.asJava
+ }
+
+ /**
+ *
+ * @param event
+ */
+ override def fireEvent(event: Event):Unit = {
+ ???
+ }
+
+ /**
+ * Clears datamap of the segment
+ */
+ override def clear(segment: Segment): Unit = {
+ }
+
+ /**
+ * Clear all datamaps from memory
+ */
+ override def clear(): Unit = {
+ }
+
+ /**
+ * Return metadata of this datamap
+ */
+ override def getMeta: DataMapMeta = {
+ new DataMapMeta(dataMapSchema.getProperties.get("indexcolumns").split(",").toList.asJava,
+ List(ExpressionType.EQUALS, ExpressionType.IN).asJava)
+ }
+}
+
+class FGIndexDataMap extends AbstractFineGrainIndexDataMap {
+
+ var maxMin: ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]), Long, Int)] = _
+ var FileReader: FileReader = _
+ var filePath: String = _
+ val compressor = new SnappyCompressor
+
+ /**
+ * It is called to load the data map to memory or to initialize it.
+ */
+ override def init(dataMapModel: DataMapModel): Unit = {
+ this.filePath = dataMapModel.getFilePath
+ val size = FileFactory.getCarbonFile(filePath).getSize
+ FileReader = FileFactory.getFileHolder(FileFactory.getFileType(filePath))
+ val footerLen = FileReader.readInt(filePath, size - 4)
+ val bytes = FileReader.readByteArray(filePath, size - footerLen - 4, footerLen)
+ val in = new ByteArrayInputStream(compressor.unCompressByte(bytes))
+ val obj = new ObjectInputStream(in)
+ maxMin = obj.readObject()
+ .asInstanceOf[ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]), Long, Int)]]
+ }
+
+ /**
+ * Prune the datamap with filter expression. It returns the list of
+ * blocklets where these filters can exist.
+ *
+ * @param filterExp
+ * @return
+ */
+ override def prune(
+ filterExp: FilterResolverIntf,
+ segmentProperties: SegmentProperties,
+ partitions: java.util.List[PartitionSpec]): java.util.List[Blocklet] = {
+ val buffer: ArrayBuffer[Expression] = new ArrayBuffer[Expression]()
+ val expression = filterExp.getFilterExpression
+ getEqualToExpression(expression, buffer)
+ val value = buffer.map { f =>
+ f.getChildren.get(1).evaluate(null).getString
+ }
+ val meta = findMeta(value(0).getBytes)
+ meta.map { f =>
+ readAndFindData(f, value(0).getBytes())
+ }.filter(_.isDefined).map(_.get).asJava
+ }
+
+ private def readAndFindData(meta: (String, Int, (Array[Byte], Array[Byte]), Long, Int),
+ value: Array[Byte]): Option[Blocklet] = {
+ val bytes = FileReader.readByteArray(filePath, meta._4, meta._5)
+ val outputStream = new ByteArrayInputStream(compressor.unCompressByte(bytes))
+ val obj = new ObjectInputStream(outputStream)
+ val blockletsData = obj.readObject()
+ .asInstanceOf[ArrayBuffer[(Array[Byte], Seq[Seq[Int]], Seq[Int])]]
+
+ import scala.collection.Searching._
+ val searching = blockletsData
+ .search[(Array[Byte], Seq[Seq[Int]], Seq[Int])]((value, Seq(Seq(0)), Seq(0)))(new Ordering[
+ (Array[Byte], Seq[Seq[Int]], Seq[Int])] {
+ override def compare(x: (Array[Byte], Seq[Seq[Int]], Seq[Int]),
+ y: (Array[Byte], Seq[Seq[Int]], Seq[Int])) = {
+ ByteUtil.UnsafeComparer.INSTANCE.compareTo(x._1, y._1)
+ }
+ })
+ if (searching.insertionPoint >= 0) {
+ val f = blockletsData(searching.insertionPoint)
+ val pages = f._3.zipWithIndex.map { p =>
+ val pg = new FineGrainBlocklet.Page
+ pg.setPageId(p._1)
+ pg.setRowId(f._2(p._2).toArray)
+ pg
+ }
+ Some(new FineGrainBlocklet(meta._1, meta._2.toString, pages.toList.asJava))
+ } else {
+ None
+ }
+ }
+
+ private def findMeta(value: Array[Byte]) = {
+ val tuples = maxMin.filter { f =>
+ ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._3._1) >= 0 &&
+ ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._3._2) <= 0
+ }
+ tuples
+ }
+
+ def getEqualToExpression(expression: Expression, buffer: ArrayBuffer[Expression]): Unit = {
+ if (expression.isInstanceOf[EqualToExpression]) {
+ buffer += expression
+ } else {
+ if (expression.getChildren != null) {
+ expression.getChildren.asScala.map { f =>
+ if (f.isInstanceOf[EqualToExpression]) {
+ buffer += f
+ }
+ getEqualToExpression(f, buffer)
+ }
+ }
+ }
+ }
+
+ /**
+ * Clear complete index table and release memory.
+ */
+ override def clear():Unit = {
+ ???
+ }
+
+ override def isScanRequired(filterExp: FilterResolverIntf): Boolean = ???
+}
+
+class FGDataMapWriter(identifier: AbsoluteTableIdentifier,
+ segment: Segment, dataWriterPath: String, dataMapSchema: DataMapSchema)
+ extends AbstractDataMapWriter(identifier, segment, dataWriterPath) {
+
+ var currentBlockId: String = null
+ val fgwritepath = dataWriterPath + "/" + dataMapSchema.getDataMapName + System.nanoTime() +
+ ".datamap"
+ val stream: DataOutputStream = FileFactory
+ .getDataOutputStream(fgwritepath, FileFactory.getFileType(fgwritepath))
+ val blockletList = new ArrayBuffer[(Array[Byte], Seq[Int], Seq[Int])]()
+ val maxMin = new ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]), Long, Int)]()
+ var position: Long = 0
+ val compressor = new SnappyCompressor
+
+ /**
+ * Start of new block notification.
+ *
+ * @param blockId file name of the carbondata file
+ */
+ override def onBlockStart(blockId: String): Unit = {
+ currentBlockId = blockId
+ }
+
+ /**
+ * End of block notification
+ */
+ override def onBlockEnd(blockId: String): Unit = {
+
+ }
+
+ /**
+ * Start of new blocklet notification.
+ *
+ * @param blockletId sequence number of blocklet in the block
+ */
+ override def onBlockletStart(blockletId: Int): Unit = {
+
+ }
+
+ /**
+ * End of blocklet notification
+ *
+ * @param blockletId sequence number of blocklet in the block
+ */
+ override def onBlockletEnd(blockletId: Int): Unit = {
+ val sorted = blockletList
+ .sortWith((l, r) => ByteUtil.UnsafeComparer.INSTANCE.compareTo(l._1, r._1) <= 0)
+ var oldValue: (Array[Byte], Seq[Seq[Int]], Seq[Int]) = null
+ var addedLast: Boolean = false
+ val blockletListUpdated = new ArrayBuffer[(Array[Byte], Seq[Seq[Int]], Seq[Int])]()
+ // Merge all same column values to single row.
+ sorted.foreach { f =>
+ if (oldValue != null) {
+ if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(f._1, oldValue._1) == 0) {
+ oldValue = (oldValue._1, oldValue._2 ++ Seq(f._2), oldValue._3 ++ f._3)
+ addedLast = false
+ } else {
+ blockletListUpdated += oldValue
+ oldValue = (f._1, Seq(f._2), f._3)
+ addedLast = true
+ }
+ } else {
+ oldValue = (f._1, Seq(f._2), f._3)
+ addedLast = false
+ }
+ }
+ if (!addedLast && oldValue != null) {
+ blockletListUpdated += oldValue
+ }
+
+ val out = new ByteOutputStream()
+ val outStream = new ObjectOutputStream(out)
+ outStream.writeObject(blockletListUpdated)
+ outStream.close()
+ val bytes = compressor.compressByte(out.getBytes)
+ stream.write(bytes)
+ maxMin +=
+ ((currentBlockId + "", blockletId, (blockletListUpdated.head._1, blockletListUpdated.last
+ ._1), position, bytes.length))
+ position += bytes.length
+ blockletList.clear()
+ }
+
+ /**
+ * Add the column pages row to the datamap, order of pages is same as `indexColumns` in
+ * DataMapMeta returned in IndexDataMapFactory.
+ *
+ * Implementation should copy the content of `pages` as needed, because `pages` memory
+ * may be freed after this method returns, if using unsafe column page.
+ */
+ override def onPageAdded(blockletId: Int,
+ pageId: Int,
+ pages: Array[ColumnPage]): Unit = {
+ val size = pages(0).getPageSize
+ val list = new ArrayBuffer[(Array[Byte], Int)]()
+ var i = 0
+ while (i < size) {
+ val bytes = pages(0).getBytes(i)
+ val newBytes = new Array[Byte](bytes.length - 2)
+ System.arraycopy(bytes, 2, newBytes, 0, newBytes.length)
+ list += ((newBytes, i))
+ i = i + 1
+ }
+ // Sort based on the column data in order to create index.
+ val sorted = list
+ .sortWith((l, r) => ByteUtil.UnsafeComparer.INSTANCE.compareTo(l._1, r._1) <= 0)
+ var oldValue: (Array[Byte], Seq[Int], Seq[Int]) = null
+ var addedLast: Boolean = false
+ // Merge all same column values to single row.
+ sorted.foreach { f =>
+ if (oldValue != null) {
+ if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(f._1, oldValue._1) == 0) {
+ oldValue = (oldValue._1, oldValue._2 ++ Seq(f._2), oldValue._3)
+ addedLast = false
+ } else {
+ blockletList += oldValue
+ oldValue = (f._1, Seq(f._2), Seq(pageId))
+ addedLast = true
+ }
+ } else {
+ oldValue = (f._1, Seq(f._2), Seq(pageId))
+ addedLast = false
+ }
+ }
+ if (!addedLast && oldValue != null) {
+ blockletList += oldValue
+ }
+ }
+
+
+ /**
+ * This is called during closing of writer.So after this call no more data will be sent to this
+ * class.
+ */
+ override def finish(): Unit = {
+ val out = new ByteOutputStream()
+ val outStream = new ObjectOutputStream(out)
+ outStream.writeObject(maxMin)
+ outStream.close()
+ val bytes = compressor.compressByte(out.getBytes)
+ stream.write(bytes)
+ stream.writeInt(bytes.length)
+ stream.close()
+ commitFile(fgwritepath)
+ }
+}
+
+class FGIndexDataMapTestCase extends QueryTest with BeforeAndAfterAll {
+
+ val file2 = resourcesPath + "/compaction/fil2.csv"
+
+ override protected def beforeAll(): Unit = {
+ //n should be about 5000000 of reset if size is default 1024
+ val n = 150000
+ CompactionSupportGlobalSortBigFileTest.createFile(file2, n * 4, n)
+ sql("DROP TABLE IF EXISTS normal_test")
+ sql(
+ """
+ | CREATE TABLE normal_test(id INT, name STRING, city STRING, age INT)
+ | STORED BY 'org.apache.carbondata.format'
+ | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+ """.stripMargin)
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE normal_test OPTIONS('header'='false')")
+ }
+
+ test("test fg datamap") {
+ sql("DROP TABLE IF EXISTS datamap_test")
+ sql(
+ """
+ | CREATE TABLE datamap_test(id INT, name STRING, city STRING, age INT)
+ | STORED BY 'org.apache.carbondata.format'
+ | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+ """.stripMargin)
+ val table = CarbonMetadata.getInstance().getCarbonTable("default_datamap_test")
+ // register datamap writer
+ sql(
+ s"""
+ | CREATE DATAMAP ggdatamap ON TABLE datamap_test
+ | USING '${classOf[FGIndexDataMapFactory].getName}'
+ | DMPROPERTIES('indexcolumns'='name')
+ """.stripMargin)
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')")
+ checkAnswer(sql("select * from datamap_test where name='n502670'"),
+ sql("select * from normal_test where name='n502670'"))
+ }
+
+ test("test fg datamap with 2 datamaps ") {
+ sql("DROP TABLE IF EXISTS datamap_test")
+ sql(
+ """
+ | CREATE TABLE datamap_test(id INT, name STRING, city STRING, age INT)
+ | STORED BY 'org.apache.carbondata.format'
+ | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+ """.stripMargin)
+ val table = CarbonMetadata.getInstance().getCarbonTable("default_datamap_test")
+ // register datamap writer
+ sql(
+ s"""
+ | CREATE DATAMAP ggdatamap1 ON TABLE datamap_test
+ | USING '${classOf[FGIndexDataMapFactory].getName}'
+ | DMPROPERTIES('indexcolumns'='name')
+ """.stripMargin)
+ sql(
+ s"""
+ | CREATE DATAMAP ggdatamap2 ON TABLE datamap_test
+ | USING '${classOf[FGIndexDataMapFactory].getName}'
+ | DMPROPERTIES('indexcolumns'='city')
+ """.stripMargin)
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')")
+ checkAnswer(sql("select * from datamap_test where name='n502670' and city='c2670'"),
+ sql("select * from normal_test where name='n502670' and city='c2670'"))
+ }
+
+ override protected def afterAll(): Unit = {
+ CompactionSupportGlobalSortBigFileTest.deleteFile(file2)
+ sql("DROP TABLE IF EXISTS normal_test")
+ sql("DROP TABLE IF EXISTS datamap_test")
+ }
+}