You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/03/08 16:55:51 UTC

[50/54] [abbrv] carbondata git commit: [HOTFIX] Add dava doc for datamap interface

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fc2a7eb3/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGIndexDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGIndexDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGIndexDataMapTestCase.scala
deleted file mode 100644
index 72a88ea..0000000
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGIndexDataMapTestCase.scala
+++ /dev/null
@@ -1,383 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.spark.testsuite.datamap
-
-import java.io.{ByteArrayInputStream, DataOutputStream, ObjectInputStream, ObjectOutputStream}
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
-
-import com.sun.xml.internal.messaging.saaj.util.ByteOutputStream
-import org.apache.spark.sql.test.util.QueryTest
-import org.scalatest.BeforeAndAfterAll
-
-import org.apache.carbondata.core.datamap.dev.{AbstractDataMapWriter, DataMapModel}
-import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager, Segment}
-import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager}
-import org.apache.carbondata.core.datamap.dev.{AbstractDataMapWriter, DataMapModel}
-import org.apache.carbondata.core.datamap.dev.cgdatamap.{AbstractCoarseGrainIndexDataMap, AbstractCoarseGrainIndexDataMapFactory}
-import org.apache.carbondata.core.datastore.FileReader
-import org.apache.carbondata.core.datastore.block.SegmentProperties
-import org.apache.carbondata.core.datastore.compression.SnappyCompressor
-import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
-import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.datastore.page.ColumnPage
-import org.apache.carbondata.core.indexstore.{Blocklet, PartitionSpec}
-import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable
-import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
-import org.apache.carbondata.core.scan.expression.Expression
-import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression
-import org.apache.carbondata.core.scan.filter.intf.ExpressionType
-import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf
-import org.apache.carbondata.core.util.ByteUtil
-import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.events.Event
-import org.apache.carbondata.spark.testsuite.datacompaction.CompactionSupportGlobalSortBigFileTest
-
-class CGIndexDataMapFactory extends AbstractCoarseGrainIndexDataMapFactory {
-  var identifier: AbsoluteTableIdentifier = _
-  var dataMapSchema: DataMapSchema = _
-
-  /**
-   * Initialization of Datamap factory with the identifier and datamap name
-   */
-  override def init(identifier: AbsoluteTableIdentifier, dataMapSchema: DataMapSchema): Unit = {
-    this.identifier = identifier
-    this.dataMapSchema = dataMapSchema
-  }
-
-  /**
-   * Return a new write for this datamap
-   */
-  override def createWriter(segment: Segment, dataWritePath: String): AbstractDataMapWriter = {
-    new CGDataMapWriter(identifier, segment, dataWritePath, dataMapSchema)
-  }
-
-  /**
-   * Get the datamap for segmentid
-   */
-  override def getDataMaps(segment: Segment): java.util.List[AbstractCoarseGrainDataMap] = {
-    val file = FileFactory.getCarbonFile(
-      CarbonTablePath.getSegmentPath(identifier.getTablePath, segment.getSegmentNo))
-
-    val files = file.listFiles(new CarbonFileFilter {
-      override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap")
-    })
-    files.map {f =>
-      val dataMap: AbstractCoarseGrainIndexDataMap = new CGIndexDataMap()
-      dataMap.init(new DataMapModel(f.getCanonicalPath))
-      dataMap
-    }.toList.asJava
-  }
-
-
-  /**
-   * Get datamaps for distributable object.
-   */
-  override def getDataMaps(
-      distributable: DataMapDistributable): java.util.List[AbstractCoarseGrainIndexDataMap] = {
-    val mapDistributable = distributable.asInstanceOf[BlockletDataMapDistributable]
-    val dataMap: AbstractCoarseGrainIndexDataMap = new CGIndexDataMap()
-    dataMap.init(new DataMapModel(mapDistributable.getFilePath))
-    Seq(dataMap).asJava
-  }
-
-  /**
-   *
-   * @param event
-   */
-  override def fireEvent(event: Event): Unit = {
-    ???
-  }
-
-  /**
-   * Get all distributable objects of a segmentid
-   *
-   * @return
-   */
-  override def toDistributable(segment: Segment): java.util.List[DataMapDistributable] = {
-    val file = FileFactory.getCarbonFile(
-      CarbonTablePath.getSegmentPath(identifier.getTablePath, segment.getSegmentNo))
-
-    val files = file.listFiles(new CarbonFileFilter {
-      override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap")
-    })
-    files.map { f =>
-      val d:DataMapDistributable = new BlockletDataMapDistributable(f.getCanonicalPath)
-      d
-    }.toList.asJava
-  }
-
-
-  /**
-   * Clears datamap of the segment
-   */
-  override def clear(segment: Segment): Unit = {
-
-  }
-
-  /**
-   * Clear all datamaps from memory
-   */
-  override def clear(): Unit = {
-
-  }
-
-  /**
-   * Return metadata of this datamap
-   */
-  override def getMeta: DataMapMeta = {
-    new DataMapMeta(dataMapSchema.getProperties.get("indexcolumns").split(",").toList.asJava,
-      List(ExpressionType.EQUALS, ExpressionType.IN).asJava)
-  }
-}
-
-class CGIndexDataMap extends AbstractCoarseGrainIndexDataMap {
-
-  var maxMin: ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]))] = _
-  var FileReader: FileReader = _
-  var filePath: String = _
-  val compressor = new SnappyCompressor
-
-  /**
-   * It is called to load the data map to memory or to initialize it.
-   */
-  override def init(dataMapModel: DataMapModel): Unit = {
-    this.filePath = dataMapModel.getFilePath
-    val size = FileFactory.getCarbonFile(filePath).getSize
-    FileReader = FileFactory.getFileHolder(FileFactory.getFileType(filePath))
-    val footerLen = FileReader.readInt(filePath, size-4)
-    val bytes = FileReader.readByteArray(filePath, size-footerLen-4, footerLen)
-    val in = new ByteArrayInputStream(compressor.unCompressByte(bytes))
-    val obj = new ObjectInputStream(in)
-    maxMin = obj.readObject().asInstanceOf[ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]))]]
-  }
-
-  /**
-   * Prune the datamap with filter expression. It returns the list of
-   * blocklets where these filters can exist.
-   *
-   * @param filterExp
-   * @return
-   */
-  override def prune(
-      filterExp: FilterResolverIntf,
-      segmentProperties: SegmentProperties,
-      partitions: java.util.List[PartitionSpec]): java.util.List[Blocklet] = {
-    val buffer: ArrayBuffer[Expression] = new ArrayBuffer[Expression]()
-    val expression = filterExp.getFilterExpression
-    getEqualToExpression(expression, buffer)
-    val value = buffer.map { f =>
-      f.getChildren.get(1).evaluate(null).getString
-    }
-    val meta = findMeta(value(0).getBytes)
-    meta.map { f=>
-      new Blocklet(f._1, f._2 + "")
-    }.asJava
-  }
-
-
-  private def findMeta(value: Array[Byte]) = {
-    val tuples = maxMin.filter { f =>
-      ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._3._1) <= 0 &&
-      ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._3._2) >= 0
-    }
-    tuples
-  }
-
-  private def getEqualToExpression(expression: Expression, buffer: ArrayBuffer[Expression]): Unit = {
-    if (expression.isInstanceOf[EqualToExpression]) {
-      buffer += expression
-    } else {
-      if (expression.getChildren != null) {
-        expression.getChildren.asScala.map { f =>
-          if (f.isInstanceOf[EqualToExpression]) {
-            buffer += f
-          }
-          getEqualToExpression(f, buffer)
-        }
-      }
-    }
-  }
-
-  /**
-   * Clear complete index table and release memory.
-   */
-  override def clear() = {
-    ???
-  }
-
-  override def isScanRequired(filterExp: FilterResolverIntf): Boolean = ???
-}
-
-class CGDataMapWriter(identifier: AbsoluteTableIdentifier,
-    segment: Segment,
-    dataWritePath: String,
-    dataMapSchema: DataMapSchema)
-  extends AbstractDataMapWriter(identifier, segment, dataWritePath) {
-
-  var currentBlockId: String = null
-  val cgwritepath = dataWritePath + "/" +
-                    dataMapSchema.getDataMapName + System.nanoTime() + ".datamap"
-  lazy val stream: DataOutputStream = FileFactory
-    .getDataOutputStream(cgwritepath, FileFactory.getFileType(cgwritepath))
-  val blockletList = new ArrayBuffer[Array[Byte]]()
-  val maxMin = new ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]))]()
-  val compressor = new SnappyCompressor
-
-  /**
-   * Start of new block notification.
-   *
-   * @param blockId file name of the carbondata file
-   */
-  override def onBlockStart(blockId: String): Unit = {
-    currentBlockId = blockId
-  }
-
-  /**
-   * End of block notification
-   */
-  override def onBlockEnd(blockId: String): Unit = {
-
-  }
-
-  /**
-   * Start of new blocklet notification.
-   *
-   * @param blockletId sequence number of blocklet in the block
-   */
-  override def onBlockletStart(blockletId: Int): Unit = {
-
-  }
-
-  /**
-   * End of blocklet notification
-   *
-   * @param blockletId sequence number of blocklet in the block
-   */
-  override def onBlockletEnd(blockletId: Int): Unit = {
-    val sorted = blockletList
-      .sortWith((l, r) => ByteUtil.UnsafeComparer.INSTANCE.compareTo(l, r) <= 0)
-    maxMin +=
-    ((currentBlockId+"", blockletId, (sorted.last, sorted.head)))
-    blockletList.clear()
-  }
-
-  /**
-   * Add the column pages row to the datamap, order of pages is same as `indexColumns` in
-   * DataMapMeta returned in IndexDataMapFactory.
-   *
-   * Implementation should copy the content of `pages` as needed, because `pages` memory
-   * may be freed after this method returns, if using unsafe column page.
-   */
-  override def onPageAdded(blockletId: Int,
-      pageId: Int,
-      pages: Array[ColumnPage]): Unit = {
-    val size = pages(0).getPageSize
-    val list = new ArrayBuffer[Array[Byte]]()
-    var i = 0
-    while (i < size) {
-      val bytes = pages(0).getBytes(i)
-      val newBytes = new Array[Byte](bytes.length - 2)
-      System.arraycopy(bytes, 2, newBytes, 0, newBytes.length)
-      list += newBytes
-      i = i + 1
-    }
-    // Sort based on the column data in order to create index.
-    val sorted = list
-      .sortWith((l, r) => ByteUtil.UnsafeComparer.INSTANCE.compareTo(l, r) <= 0)
-    blockletList += sorted.head
-    blockletList += sorted.last
-  }
-
-
-  /**
-   * This is called during closing of writer.So after this call no more data will be sent to this
-   * class.
-   */
-  override def finish(): Unit = {
-    val out = new ByteOutputStream()
-    val outStream = new ObjectOutputStream(out)
-    outStream.writeObject(maxMin)
-    outStream.close()
-    val bytes = compressor.compressByte(out.getBytes)
-    stream.write(bytes)
-    stream.writeInt(bytes.length)
-    stream.close()
-    commitFile(cgwritepath)
-  }
-
-
-}
-
-class CGIndexDataMapTestCase extends QueryTest with BeforeAndAfterAll {
-
-  val file2 = resourcesPath + "/compaction/fil2.csv"
-  override protected def beforeAll(): Unit = {
-    //n should be about 5000000 of reset if size is default 1024
-    val n = 150000
-    CompactionSupportGlobalSortBigFileTest.createFile(file2, n * 4, n)
-    sql("DROP TABLE IF EXISTS normal_test")
-    sql(
-      """
-        | CREATE TABLE normal_test(id INT, name STRING, city STRING, age INT)
-        | STORED BY 'org.apache.carbondata.format'
-        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
-      """.stripMargin)
-    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE normal_test OPTIONS('header'='false')")
-  }
-
-  test("test cg datamap") {
-    sql("DROP TABLE IF EXISTS datamap_test_cg")
-    sql(
-      """
-        | CREATE TABLE datamap_test_cg(id INT, name STRING, city STRING, age INT)
-        | STORED BY 'org.apache.carbondata.format'
-        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
-      """.stripMargin)
-    val table = CarbonMetadata.getInstance().getCarbonTable("default_datamap_test_cg")
-    // register datamap writer
-    sql(s"create datamap cgdatamap on table datamap_test_cg using '${classOf[CGIndexDataMapFactory].getName}' DMPROPERTIES('indexcolumns'='name')")
-    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test_cg OPTIONS('header'='false')")
-    checkAnswer(sql("select * from datamap_test_cg where name='n502670'"),
-      sql("select * from normal_test where name='n502670'"))
-  }
-
-  test("test cg datamap with 2 datamaps ") {
-    sql("DROP TABLE IF EXISTS datamap_test")
-    sql(
-      """
-        | CREATE TABLE datamap_test(id INT, name STRING, city STRING, age INT)
-        | STORED BY 'org.apache.carbondata.format'
-        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
-      """.stripMargin)
-    val table = CarbonMetadata.getInstance().getCarbonTable("default_datamap_test")
-    // register datamap writer
-    sql(s"create datamap ggdatamap1 on table datamap_test using '${classOf[CGIndexDataMapFactory].getName}' DMPROPERTIES('indexcolumns'='name')")
-    sql(s"create datamap ggdatamap2 on table datamap_test using '${classOf[CGIndexDataMapFactory].getName}' DMPROPERTIES('indexcolumns'='city')")
-    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')")
-    checkAnswer(sql("select * from datamap_test where name='n502670' and city='c2670'"),
-      sql("select * from normal_test where name='n502670' and city='c2670'"))
-  }
-
-  override protected def afterAll(): Unit = {
-    CompactionSupportGlobalSortBigFileTest.deleteFile(file2)
-    sql("DROP TABLE IF EXISTS normal_test")
-    sql("DROP TABLE IF EXISTS datamap_test_cg")
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fc2a7eb3/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
new file mode 100644
index 0000000..d5cae15
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.datamap
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.test.util.QueryTest
+import org.apache.spark.sql.{DataFrame, SaveMode}
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager, Segment}
+import org.apache.carbondata.core.datamap.dev.DataMapWriter
+import org.apache.carbondata.core.datamap.dev.cgdatamap.{CoarseGrainDataMap, CoarseGrainDataMapFactory}
+import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta}
+import org.apache.carbondata.core.datastore.page.ColumnPage
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
+import org.apache.carbondata.core.scan.filter.intf.ExpressionType
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.events.Event
+
+class C2DataMapFactory() extends CoarseGrainDataMapFactory {
+
+  var identifier: AbsoluteTableIdentifier = _
+
+  override def init(identifier: AbsoluteTableIdentifier,
+      dataMapSchema: DataMapSchema): Unit = {
+    this.identifier = identifier
+  }
+
+  override def fireEvent(event: Event): Unit = ???
+
+  override def clear(segment: Segment): Unit = {}
+
+  override def clear(): Unit = {}
+
+  override def getDataMaps(distributable: DataMapDistributable): util.List[CoarseGrainDataMap] = ???
+
+  override def getDataMaps(segment: Segment): util.List[CoarseGrainDataMap] = ???
+
+  override def createWriter(segment: Segment, dataWritePath: String): DataMapWriter =
+    DataMapWriterSuite.dataMapWriterC2Mock(identifier, segment, dataWritePath)
+
+  override def getMeta: DataMapMeta = new DataMapMeta(List("c2").asJava, List(ExpressionType.EQUALS).asJava)
+
+  /**
+   * Get all distributable objects of a segmentid
+   *
+   * @return
+   */
+  override def toDistributable(segmentId: Segment): util.List[DataMapDistributable] = {
+    ???
+  }
+
+}
+
+class DataMapWriterSuite extends QueryTest with BeforeAndAfterAll {
+  def buildTestData(numRows: Int): DataFrame = {
+    import sqlContext.implicits._
+    sqlContext.sparkContext.parallelize(1 to numRows, 1)
+      .map(x => ("a" + x, "b", x))
+      .toDF("c1", "c2", "c3")
+  }
+
+  def dropTable(): Unit = {
+    sql("DROP TABLE IF EXISTS carbon1")
+    sql("DROP TABLE IF EXISTS carbon2")
+  }
+
+  override def beforeAll {
+    dropTable()
+  }
+
+  test("test write datamap 2 pages") {
+    sql(s"CREATE TABLE carbon1(c1 STRING, c2 STRING, c3 INT) STORED BY 'org.apache.carbondata.format'")
+    // register datamap writer
+    sql(s"CREATE DATAMAP test ON TABLE carbon1 USING '${classOf[C2DataMapFactory].getName}'")
+    val df = buildTestData(33000)
+
+    // save dataframe to carbon file
+    df.write
+      .format("carbondata")
+      .option("tableName", "carbon1")
+      .option("tempCSV", "false")
+      .option("sort_columns","c1")
+      .mode(SaveMode.Overwrite)
+      .save()
+
+    assert(DataMapWriterSuite.callbackSeq.head.contains("block start"))
+    assert(DataMapWriterSuite.callbackSeq.last.contains("block end"))
+    assert(
+      DataMapWriterSuite.callbackSeq.slice(1, DataMapWriterSuite.callbackSeq.length - 1) == Seq(
+        "blocklet start 0",
+        "add page data: blocklet 0, page 0",
+        "add page data: blocklet 0, page 1",
+        "blocklet end: 0"
+      ))
+    DataMapWriterSuite.callbackSeq = Seq()
+  }
+
+  test("test write datamap 2 blocklet") {
+    sql(s"CREATE TABLE carbon2(c1 STRING, c2 STRING, c3 INT) STORED BY 'org.apache.carbondata.format'")
+    sql(s"CREATE DATAMAP test ON TABLE carbon2 USING '${classOf[C2DataMapFactory].getName}'")
+
+    CarbonProperties.getInstance()
+      .addProperty("carbon.blockletgroup.size.in.mb", "1")
+    CarbonProperties.getInstance()
+      .addProperty("carbon.number.of.cores.while.loading",
+        CarbonCommonConstants.NUM_CORES_DEFAULT_VAL)
+
+    val df = buildTestData(300000)
+
+    // save dataframe to carbon file
+    df.write
+      .format("carbondata")
+      .option("tableName", "carbon2")
+      .option("tempCSV", "false")
+      .option("sort_columns","c1")
+      .option("SORT_SCOPE","GLOBAL_SORT")
+      .mode(SaveMode.Overwrite)
+      .save()
+
+    assert(DataMapWriterSuite.callbackSeq.head.contains("block start"))
+    assert(DataMapWriterSuite.callbackSeq.last.contains("block end"))
+    // corrected test case the min "carbon.blockletgroup.size.in.mb" size could not be less than
+    // 64 MB
+    assert(
+      DataMapWriterSuite.callbackSeq.slice(1, DataMapWriterSuite.callbackSeq.length - 1) == Seq(
+        "blocklet start 0",
+        "add page data: blocklet 0, page 0",
+        "add page data: blocklet 0, page 1",
+        "add page data: blocklet 0, page 2",
+        "add page data: blocklet 0, page 3",
+        "add page data: blocklet 0, page 4",
+        "add page data: blocklet 0, page 5",
+        "add page data: blocklet 0, page 6",
+        "add page data: blocklet 0, page 7",
+        "add page data: blocklet 0, page 8",
+        "add page data: blocklet 0, page 9",
+        "blocklet end: 0"
+      ))
+    DataMapWriterSuite.callbackSeq = Seq()
+  }
+
+  override def afterAll {
+    dropTable()
+  }
+}
+
+object DataMapWriterSuite {
+
+  var callbackSeq: Seq[String] = Seq[String]()
+
+  def dataMapWriterC2Mock(identifier: AbsoluteTableIdentifier, segment: Segment,
+      dataWritePath: String) =
+    new DataMapWriter(identifier, segment, dataWritePath) {
+
+    override def onPageAdded(
+        blockletId: Int,
+        pageId: Int,
+        pages: Array[ColumnPage]): Unit = {
+      assert(pages.length == 1)
+      assert(pages(0).getDataType == DataTypes.STRING)
+      val bytes: Array[Byte] = pages(0).getByteArrayPage()(0)
+      assert(bytes.sameElements(Seq(0, 1, 'b'.toByte)))
+      callbackSeq :+= s"add page data: blocklet $blockletId, page $pageId"
+    }
+
+    override def onBlockletEnd(blockletId: Int): Unit = {
+      callbackSeq :+= s"blocklet end: $blockletId"
+    }
+
+    override def onBlockEnd(blockId: String): Unit = {
+      callbackSeq :+= s"block end $blockId"
+    }
+
+    override def onBlockletStart(blockletId: Int): Unit = {
+      callbackSeq :+= s"blocklet start $blockletId"
+    }
+
+    /**
+     * Start of new block notification.
+     *
+     * @param blockId file name of the carbondata file
+     */
+    override def onBlockStart(blockId: String) = {
+      callbackSeq :+= s"block start $blockId"
+    }
+
+    /**
+     * This is called during closing of writer.So after this call no more data will be sent to this
+     * class.
+     */
+    override def finish() = {
+
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fc2a7eb3/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
new file mode 100644
index 0000000..977b31d
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
@@ -0,0 +1,473 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.testsuite.datamap
+
+import java.io.{ByteArrayInputStream, DataOutputStream, ObjectInputStream, ObjectOutputStream}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+import com.sun.xml.internal.messaging.saaj.util.ByteOutputStream
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.datamap.dev.{DataMapModel}
+import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager, Segment}
+import org.apache.carbondata.core.datamap.dev.fgdatamap.{FineGrainBlocklet, FineGrainDataMap, FineGrainDataMapFactory}
+import org.apache.carbondata.core.datamap.dev.{DataMapModel, DataMapWriter}
+import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta}
+import org.apache.carbondata.core.datastore.FileReader
+import org.apache.carbondata.core.datastore.block.SegmentProperties
+import org.apache.carbondata.core.datastore.compression.SnappyCompressor
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.datastore.page.ColumnPage
+import org.apache.carbondata.core.indexstore.{Blocklet, PartitionSpec}
+import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
+import org.apache.carbondata.core.scan.expression.Expression
+import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression
+import org.apache.carbondata.core.scan.filter.intf.ExpressionType
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf
+import org.apache.carbondata.core.util.ByteUtil
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.events.Event
+import org.apache.carbondata.spark.testsuite.datacompaction.CompactionSupportGlobalSortBigFileTest
+
+class FGDataMapFactory extends FineGrainDataMapFactory {
+  var identifier: AbsoluteTableIdentifier = _
+  var dataMapSchema: DataMapSchema = _
+
+  /**
+   * Initialization of Datamap factory with the identifier and datamap name
+   */
+  override def init(identifier: AbsoluteTableIdentifier, dataMapSchema: DataMapSchema): Unit = {
+    this.identifier = identifier
+    this.dataMapSchema = dataMapSchema
+  }
+
+  /**
+   * Return a new write for this datamap
+   */
+  override def createWriter(segment: Segment, dataWritePath: String): DataMapWriter = {
+    new FGDataMapWriter(identifier, segment, dataWritePath, dataMapSchema)
+  }
+
+  /**
+   * Get the datamap for segmentid
+   */
+  override def getDataMaps(segment: Segment): java.util.List[FineGrainDataMap] = {
+    val file = FileFactory
+      .getCarbonFile(CarbonTablePath.getSegmentPath(identifier.getTablePath, segment.getSegmentNo))
+
+    val files = file.listFiles(new CarbonFileFilter {
+      override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap")
+    })
+    files.map { f =>
+      val dataMap: FineGrainDataMap = new FGDataMap()
+      dataMap.init(new DataMapModel(f.getCanonicalPath))
+      dataMap
+    }.toList.asJava
+  }
+
+  /**
+   * Get datamap for distributable object.
+   */
+  override def getDataMaps(
+      distributable: DataMapDistributable): java.util.List[FineGrainDataMap]= {
+    val mapDistributable = distributable.asInstanceOf[BlockletDataMapDistributable]
+    val dataMap: FineGrainDataMap = new FGDataMap()
+    dataMap.init(new DataMapModel(mapDistributable.getFilePath))
+    Seq(dataMap).asJava
+  }
+
+  /**
+   * Get all distributable objects of a segmentid
+   *
+   * @return
+   */
+  override def toDistributable(segment: Segment): java.util.List[DataMapDistributable] = {
+    val file = FileFactory.getCarbonFile(
+      CarbonTablePath.getSegmentPath(identifier.getTablePath, segment.getSegmentNo))
+
+    val files = file.listFiles(new CarbonFileFilter {
+      override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap")
+    })
+    files.map { f =>
+      val d: DataMapDistributable = new BlockletDataMapDistributable(f.getCanonicalPath)
+      d
+    }.toList.asJava
+  }
+
+  /**
+   *
+   * @param event
+   */
+  override def fireEvent(event: Event):Unit = {
+    ???
+  }
+
+  /**
+   * Clears datamap of the segment
+   */
+  override def clear(segment: Segment): Unit = {
+  }
+
+  /**
+   * Clear all datamaps from memory
+   */
+  override def clear(): Unit = {
+  }
+
+  /**
+   * Return metadata of this datamap
+   */
+  override def getMeta: DataMapMeta = {
+    new DataMapMeta(dataMapSchema.getProperties.get("indexcolumns").split(",").toList.asJava,
+      List(ExpressionType.EQUALS, ExpressionType.IN).asJava)
+  }
+}
+
+class FGDataMap extends FineGrainDataMap {
+
+  var maxMin: ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]), Long, Int)] = _
+  var FileReader: FileReader = _
+  var filePath: String = _
+  val compressor = new SnappyCompressor
+
+  /**
+   * It is called to load the data map to memory or to initialize it.
+   */
+  override def init(dataMapModel: DataMapModel): Unit = {
+    this.filePath = dataMapModel.getFilePath
+    val size = FileFactory.getCarbonFile(filePath).getSize
+    FileReader = FileFactory.getFileHolder(FileFactory.getFileType(filePath))
+    val footerLen = FileReader.readInt(filePath, size - 4)
+    val bytes = FileReader.readByteArray(filePath, size - footerLen - 4, footerLen)
+    val in = new ByteArrayInputStream(compressor.unCompressByte(bytes))
+    val obj = new ObjectInputStream(in)
+    maxMin = obj.readObject()
+      .asInstanceOf[ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]), Long, Int)]]
+  }
+
+  /**
+   * Prune the datamap with filter expression. It returns the list of
+   * blocklets where these filters can exist.
+   *
+   * @param filterExp
+   * @return
+   */
+  override def prune(
+      filterExp: FilterResolverIntf,
+      segmentProperties: SegmentProperties,
+      partitions: java.util.List[PartitionSpec]): java.util.List[Blocklet] = {
+    val buffer: ArrayBuffer[Expression] = new ArrayBuffer[Expression]()
+    val expression = filterExp.getFilterExpression
+    getEqualToExpression(expression, buffer)
+    val value = buffer.map { f =>
+      f.getChildren.get(1).evaluate(null).getString
+    }
+    val meta = findMeta(value(0).getBytes)
+    meta.map { f =>
+      readAndFindData(f, value(0).getBytes())
+    }.filter(_.isDefined).map(_.get).asJava
+  }
+
+  private def readAndFindData(meta: (String, Int, (Array[Byte], Array[Byte]), Long, Int),
+      value: Array[Byte]): Option[Blocklet] = {
+    val bytes = FileReader.readByteArray(filePath, meta._4, meta._5)
+    val outputStream = new ByteArrayInputStream(compressor.unCompressByte(bytes))
+    val obj = new ObjectInputStream(outputStream)
+    val blockletsData = obj.readObject()
+      .asInstanceOf[ArrayBuffer[(Array[Byte], Seq[Seq[Int]], Seq[Int])]]
+
+    import scala.collection.Searching._
+    val searching = blockletsData
+      .search[(Array[Byte], Seq[Seq[Int]], Seq[Int])]((value, Seq(Seq(0)), Seq(0)))(new Ordering[
+      (Array[Byte], Seq[Seq[Int]], Seq[Int])] {
+      override def compare(x: (Array[Byte], Seq[Seq[Int]], Seq[Int]),
+          y: (Array[Byte], Seq[Seq[Int]], Seq[Int])) = {
+        ByteUtil.UnsafeComparer.INSTANCE.compareTo(x._1, y._1)
+      }
+    })
+    if (searching.insertionPoint >= 0) {
+      val f = blockletsData(searching.insertionPoint)
+      val pages = f._3.zipWithIndex.map { p =>
+        val pg = new FineGrainBlocklet.Page
+        pg.setPageId(p._1)
+        pg.setRowId(f._2(p._2).toArray)
+        pg
+      }
+      Some(new FineGrainBlocklet(meta._1, meta._2.toString, pages.toList.asJava))
+    } else {
+      None
+    }
+  }
+
+  private def findMeta(value: Array[Byte]) = {
+    val tuples = maxMin.filter { f =>
+      ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._3._1) >= 0 &&
+      ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._3._2) <= 0
+    }
+    tuples
+  }
+
+  def getEqualToExpression(expression: Expression, buffer: ArrayBuffer[Expression]): Unit = {
+    if (expression.isInstanceOf[EqualToExpression]) {
+      buffer += expression
+    } else {
+      if (expression.getChildren != null) {
+        expression.getChildren.asScala.map { f =>
+          if (f.isInstanceOf[EqualToExpression]) {
+            buffer += f
+          }
+          getEqualToExpression(f, buffer)
+        }
+      }
+    }
+  }
+
+  /**
+   * Clear complete index table and release memory.
+   */
+  override def clear():Unit = {
+    ???
+  }
+
+  override def isScanRequired(filterExp: FilterResolverIntf): Boolean = ???
+}
+
+class FGDataMapWriter(identifier: AbsoluteTableIdentifier,
+    segment: Segment, dataWriterPath: String, dataMapSchema: DataMapSchema)
+  extends DataMapWriter(identifier, segment, dataWriterPath) {
+
+  var currentBlockId: String = null
+  val fgwritepath = dataWriterPath + "/" + dataMapSchema.getDataMapName + System.nanoTime() +
+                    ".datamap"
+  val stream: DataOutputStream = FileFactory
+    .getDataOutputStream(fgwritepath, FileFactory.getFileType(fgwritepath))
+  val blockletList = new ArrayBuffer[(Array[Byte], Seq[Int], Seq[Int])]()
+  val maxMin = new ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]), Long, Int)]()
+  var position: Long = 0
+  val compressor = new SnappyCompressor
+
+  /**
+   * Start of new block notification.
+   *
+   * @param blockId file name of the carbondata file
+   */
+  override def onBlockStart(blockId: String): Unit = {
+    currentBlockId = blockId
+  }
+
+  /**
+   * End of block notification
+   */
+  override def onBlockEnd(blockId: String): Unit = {
+
+  }
+
+  /**
+   * Start of new blocklet notification.
+   *
+   * @param blockletId sequence number of blocklet in the block
+   */
+  override def onBlockletStart(blockletId: Int): Unit = {
+
+  }
+
+  /**
+   * End of blocklet notification
+   *
+   * @param blockletId sequence number of blocklet in the block
+   */
+  override def onBlockletEnd(blockletId: Int): Unit = {
+    val sorted = blockletList
+      .sortWith((l, r) => ByteUtil.UnsafeComparer.INSTANCE.compareTo(l._1, r._1) <= 0)
+    var oldValue: (Array[Byte], Seq[Seq[Int]], Seq[Int]) = null
+    var addedLast: Boolean = false
+    val blockletListUpdated = new ArrayBuffer[(Array[Byte], Seq[Seq[Int]], Seq[Int])]()
+    // Merge all same column values to single row.
+    sorted.foreach { f =>
+      if (oldValue != null) {
+        if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(f._1, oldValue._1) == 0) {
+          oldValue = (oldValue._1, oldValue._2 ++ Seq(f._2), oldValue._3 ++ f._3)
+          addedLast = false
+        } else {
+          blockletListUpdated += oldValue
+          oldValue = (f._1, Seq(f._2), f._3)
+          addedLast = true
+        }
+      } else {
+        oldValue = (f._1, Seq(f._2), f._3)
+        addedLast = false
+      }
+    }
+    if (!addedLast && oldValue != null) {
+      blockletListUpdated += oldValue
+    }
+
+    val out = new ByteOutputStream()
+    val outStream = new ObjectOutputStream(out)
+    outStream.writeObject(blockletListUpdated)
+    outStream.close()
+    val bytes = compressor.compressByte(out.getBytes)
+    stream.write(bytes)
+    maxMin +=
+    ((currentBlockId + "", blockletId, (blockletListUpdated.head._1, blockletListUpdated.last
+      ._1), position, bytes.length))
+    position += bytes.length
+    blockletList.clear()
+  }
+
+  /**
+   * Add the column pages row to the datamap, order of pages is same as `indexColumns` in
+   * DataMapMeta returned in DataMapFactory.
+   *
+   * Implementation should copy the content of `pages` as needed, because `pages` memory
+   * may be freed after this method returns, if using unsafe column page.
+   */
+  override def onPageAdded(blockletId: Int,
+      pageId: Int,
+      pages: Array[ColumnPage]): Unit = {
+    val size = pages(0).getPageSize
+    val list = new ArrayBuffer[(Array[Byte], Int)]()
+    var i = 0
+    while (i < size) {
+      val bytes = pages(0).getBytes(i)
+      val newBytes = new Array[Byte](bytes.length - 2)
+      System.arraycopy(bytes, 2, newBytes, 0, newBytes.length)
+      list += ((newBytes, i))
+      i = i + 1
+    }
+    // Sort based on the column data in order to create index.
+    val sorted = list
+      .sortWith((l, r) => ByteUtil.UnsafeComparer.INSTANCE.compareTo(l._1, r._1) <= 0)
+    var oldValue: (Array[Byte], Seq[Int], Seq[Int]) = null
+    var addedLast: Boolean = false
+    // Merge all same column values to single row.
+    sorted.foreach { f =>
+      if (oldValue != null) {
+        if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(f._1, oldValue._1) == 0) {
+          oldValue = (oldValue._1, oldValue._2 ++ Seq(f._2), oldValue._3)
+          addedLast = false
+        } else {
+          blockletList += oldValue
+          oldValue = (f._1, Seq(f._2), Seq(pageId))
+          addedLast = true
+        }
+      } else {
+        oldValue = (f._1, Seq(f._2), Seq(pageId))
+        addedLast = false
+      }
+    }
+    if (!addedLast && oldValue != null) {
+      blockletList += oldValue
+    }
+  }
+
+
+  /**
+   * This is called during closing of writer.So after this call no more data will be sent to this
+   * class.
+   */
+  override def finish(): Unit = {
+    val out = new ByteOutputStream()
+    val outStream = new ObjectOutputStream(out)
+    outStream.writeObject(maxMin)
+    outStream.close()
+    val bytes = compressor.compressByte(out.getBytes)
+    stream.write(bytes)
+    stream.writeInt(bytes.length)
+    stream.close()
+    commitFile(fgwritepath)
+  }
+}
+
+class FGDataMapTestCase extends QueryTest with BeforeAndAfterAll {
+
+  val file2 = resourcesPath + "/compaction/fil2.csv"
+
+  override protected def beforeAll(): Unit = {
+    //n should be about 5000000 of reset if size is default 1024
+    val n = 150000
+    CompactionSupportGlobalSortBigFileTest.createFile(file2, n * 4, n)
+    sql("DROP TABLE IF EXISTS normal_test")
+    sql(
+      """
+        | CREATE TABLE normal_test(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+      """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE normal_test OPTIONS('header'='false')")
+  }
+
+  test("test fg datamap") {
+    sql("DROP TABLE IF EXISTS datamap_test")
+    sql(
+      """
+        | CREATE TABLE datamap_test(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+      """.stripMargin)
+    val table = CarbonMetadata.getInstance().getCarbonTable("default_datamap_test")
+    // register datamap writer
+    sql(
+      s"""
+         | CREATE DATAMAP ggdatamap ON TABLE datamap_test
+         | USING '${classOf[FGDataMapFactory].getName}'
+         | DMPROPERTIES('indexcolumns'='name')
+       """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')")
+    checkAnswer(sql("select * from datamap_test where name='n502670'"),
+      sql("select * from normal_test where name='n502670'"))
+  }
+
+  test("test fg datamap with 2 datamaps ") {
+    sql("DROP TABLE IF EXISTS datamap_test")
+    sql(
+      """
+        | CREATE TABLE datamap_test(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+      """.stripMargin)
+    val table = CarbonMetadata.getInstance().getCarbonTable("default_datamap_test")
+    // register datamap writer
+    sql(
+      s"""
+         | CREATE DATAMAP ggdatamap1 ON TABLE datamap_test
+         | USING '${classOf[FGDataMapFactory].getName}'
+         | DMPROPERTIES('indexcolumns'='name')
+       """.stripMargin)
+    sql(
+      s"""
+         | CREATE DATAMAP ggdatamap2 ON TABLE datamap_test
+         | USING '${classOf[FGDataMapFactory].getName}'
+         | DMPROPERTIES('indexcolumns'='city')
+       """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')")
+    checkAnswer(sql("select * from datamap_test where name='n502670' and city='c2670'"),
+      sql("select * from normal_test where name='n502670' and city='c2670'"))
+  }
+
+  override protected def afterAll(): Unit = {
+    CompactionSupportGlobalSortBigFileTest.deleteFile(file2)
+    sql("DROP TABLE IF EXISTS normal_test")
+    sql("DROP TABLE IF EXISTS datamap_test")
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fc2a7eb3/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGIndexDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGIndexDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGIndexDataMapTestCase.scala
deleted file mode 100644
index d92efe7..0000000
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGIndexDataMapTestCase.scala
+++ /dev/null
@@ -1,472 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.spark.testsuite.datamap
-
-import java.io.{ByteArrayInputStream, DataOutputStream, ObjectInputStream, ObjectOutputStream}
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
-
-import com.sun.xml.internal.messaging.saaj.util.ByteOutputStream
-import org.apache.spark.sql.test.util.QueryTest
-import org.scalatest.BeforeAndAfterAll
-
-import org.apache.carbondata.core.datamap.dev.fgdatamap.{AbstractFineGrainIndexDataMap, AbstractFineGrainIndexDataMapFactory}
-import org.apache.carbondata.core.datamap.dev.{AbstractDataMapWriter, DataMapModel}
-import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager, Segment}
-import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta}
-import org.apache.carbondata.core.datastore.FileReader
-import org.apache.carbondata.core.datastore.block.SegmentProperties
-import org.apache.carbondata.core.datastore.compression.SnappyCompressor
-import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
-import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.datastore.page.ColumnPage
-import org.apache.carbondata.core.indexstore.{Blocklet, FineGrainBlocklet, PartitionSpec}
-import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable
-import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
-import org.apache.carbondata.core.scan.expression.Expression
-import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression
-import org.apache.carbondata.core.scan.filter.intf.ExpressionType
-import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf
-import org.apache.carbondata.core.util.ByteUtil
-import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.events.Event
-import org.apache.carbondata.spark.testsuite.datacompaction.CompactionSupportGlobalSortBigFileTest
-
-class FGIndexDataMapFactory extends AbstractFineGrainIndexDataMapFactory {
-  var identifier: AbsoluteTableIdentifier = _
-  var dataMapSchema: DataMapSchema = _
-
-  /**
-   * Initialization of Datamap factory with the identifier and datamap name
-   */
-  override def init(identifier: AbsoluteTableIdentifier, dataMapSchema: DataMapSchema): Unit = {
-    this.identifier = identifier
-    this.dataMapSchema = dataMapSchema
-  }
-
-  /**
-   * Return a new write for this datamap
-   */
-  override def createWriter(segment: Segment, dataWritePath: String): AbstractDataMapWriter = {
-    new FGDataMapWriter(identifier, segment, dataWritePath, dataMapSchema)
-  }
-
-  /**
-   * Get the datamap for segmentid
-   */
-  override def getDataMaps(segment: Segment): java.util.List[AbstractFineGrainIndexDataMap] = {
-    val file = FileFactory
-      .getCarbonFile(CarbonTablePath.getSegmentPath(identifier.getTablePath, segment.getSegmentNo))
-
-    val files = file.listFiles(new CarbonFileFilter {
-      override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap")
-    })
-    files.map { f =>
-      val dataMap: AbstractFineGrainIndexDataMap = new FGIndexDataMap()
-      dataMap.init(new DataMapModel(f.getCanonicalPath))
-      dataMap
-    }.toList.asJava
-  }
-
-  /**
-   * Get datamap for distributable object.
-   */
-  override def getDataMaps(
-      distributable: DataMapDistributable): java.util.List[AbstractFineGrainIndexDataMap]= {
-    val mapDistributable = distributable.asInstanceOf[BlockletDataMapDistributable]
-    val dataMap: AbstractFineGrainIndexDataMap = new FGIndexDataMap()
-    dataMap.init(new DataMapModel(mapDistributable.getFilePath))
-    Seq(dataMap).asJava
-  }
-
-  /**
-   * Get all distributable objects of a segmentid
-   *
-   * @return
-   */
-  override def toDistributable(segment: Segment): java.util.List[DataMapDistributable] = {
-    val file = FileFactory.getCarbonFile(
-      CarbonTablePath.getSegmentPath(identifier.getTablePath, segment.getSegmentNo))
-
-    val files = file.listFiles(new CarbonFileFilter {
-      override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap")
-    })
-    files.map { f =>
-      val d: DataMapDistributable = new BlockletDataMapDistributable(f.getCanonicalPath)
-      d
-    }.toList.asJava
-  }
-
-  /**
-   *
-   * @param event
-   */
-  override def fireEvent(event: Event):Unit = {
-    ???
-  }
-
-  /**
-   * Clears datamap of the segment
-   */
-  override def clear(segment: Segment): Unit = {
-  }
-
-  /**
-   * Clear all datamaps from memory
-   */
-  override def clear(): Unit = {
-  }
-
-  /**
-   * Return metadata of this datamap
-   */
-  override def getMeta: DataMapMeta = {
-    new DataMapMeta(dataMapSchema.getProperties.get("indexcolumns").split(",").toList.asJava,
-      List(ExpressionType.EQUALS, ExpressionType.IN).asJava)
-  }
-}
-
-class FGIndexDataMap extends AbstractFineGrainIndexDataMap {
-
-  var maxMin: ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]), Long, Int)] = _
-  var FileReader: FileReader = _
-  var filePath: String = _
-  val compressor = new SnappyCompressor
-
-  /**
-   * It is called to load the data map to memory or to initialize it.
-   */
-  override def init(dataMapModel: DataMapModel): Unit = {
-    this.filePath = dataMapModel.getFilePath
-    val size = FileFactory.getCarbonFile(filePath).getSize
-    FileReader = FileFactory.getFileHolder(FileFactory.getFileType(filePath))
-    val footerLen = FileReader.readInt(filePath, size - 4)
-    val bytes = FileReader.readByteArray(filePath, size - footerLen - 4, footerLen)
-    val in = new ByteArrayInputStream(compressor.unCompressByte(bytes))
-    val obj = new ObjectInputStream(in)
-    maxMin = obj.readObject()
-      .asInstanceOf[ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]), Long, Int)]]
-  }
-
-  /**
-   * Prune the datamap with filter expression. It returns the list of
-   * blocklets where these filters can exist.
-   *
-   * @param filterExp
-   * @return
-   */
-  override def prune(
-      filterExp: FilterResolverIntf,
-      segmentProperties: SegmentProperties,
-      partitions: java.util.List[PartitionSpec]): java.util.List[Blocklet] = {
-    val buffer: ArrayBuffer[Expression] = new ArrayBuffer[Expression]()
-    val expression = filterExp.getFilterExpression
-    getEqualToExpression(expression, buffer)
-    val value = buffer.map { f =>
-      f.getChildren.get(1).evaluate(null).getString
-    }
-    val meta = findMeta(value(0).getBytes)
-    meta.map { f =>
-      readAndFindData(f, value(0).getBytes())
-    }.filter(_.isDefined).map(_.get).asJava
-  }
-
-  private def readAndFindData(meta: (String, Int, (Array[Byte], Array[Byte]), Long, Int),
-      value: Array[Byte]): Option[Blocklet] = {
-    val bytes = FileReader.readByteArray(filePath, meta._4, meta._5)
-    val outputStream = new ByteArrayInputStream(compressor.unCompressByte(bytes))
-    val obj = new ObjectInputStream(outputStream)
-    val blockletsData = obj.readObject()
-      .asInstanceOf[ArrayBuffer[(Array[Byte], Seq[Seq[Int]], Seq[Int])]]
-
-    import scala.collection.Searching._
-    val searching = blockletsData
-      .search[(Array[Byte], Seq[Seq[Int]], Seq[Int])]((value, Seq(Seq(0)), Seq(0)))(new Ordering[
-      (Array[Byte], Seq[Seq[Int]], Seq[Int])] {
-      override def compare(x: (Array[Byte], Seq[Seq[Int]], Seq[Int]),
-          y: (Array[Byte], Seq[Seq[Int]], Seq[Int])) = {
-        ByteUtil.UnsafeComparer.INSTANCE.compareTo(x._1, y._1)
-      }
-    })
-    if (searching.insertionPoint >= 0) {
-      val f = blockletsData(searching.insertionPoint)
-      val pages = f._3.zipWithIndex.map { p =>
-        val pg = new FineGrainBlocklet.Page
-        pg.setPageId(p._1)
-        pg.setRowId(f._2(p._2).toArray)
-        pg
-      }
-      Some(new FineGrainBlocklet(meta._1, meta._2.toString, pages.toList.asJava))
-    } else {
-      None
-    }
-  }
-
-  private def findMeta(value: Array[Byte]) = {
-    val tuples = maxMin.filter { f =>
-      ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._3._1) >= 0 &&
-      ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._3._2) <= 0
-    }
-    tuples
-  }
-
-  def getEqualToExpression(expression: Expression, buffer: ArrayBuffer[Expression]): Unit = {
-    if (expression.isInstanceOf[EqualToExpression]) {
-      buffer += expression
-    } else {
-      if (expression.getChildren != null) {
-        expression.getChildren.asScala.map { f =>
-          if (f.isInstanceOf[EqualToExpression]) {
-            buffer += f
-          }
-          getEqualToExpression(f, buffer)
-        }
-      }
-    }
-  }
-
-  /**
-   * Clear complete index table and release memory.
-   */
-  override def clear():Unit = {
-    ???
-  }
-
-  override def isScanRequired(filterExp: FilterResolverIntf): Boolean = ???
-}
-
-class FGDataMapWriter(identifier: AbsoluteTableIdentifier,
-    segment: Segment, dataWriterPath: String, dataMapSchema: DataMapSchema)
-  extends AbstractDataMapWriter(identifier, segment, dataWriterPath) {
-
-  var currentBlockId: String = null
-  val fgwritepath = dataWriterPath + "/" + dataMapSchema.getDataMapName + System.nanoTime() +
-                    ".datamap"
-  val stream: DataOutputStream = FileFactory
-    .getDataOutputStream(fgwritepath, FileFactory.getFileType(fgwritepath))
-  val blockletList = new ArrayBuffer[(Array[Byte], Seq[Int], Seq[Int])]()
-  val maxMin = new ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]), Long, Int)]()
-  var position: Long = 0
-  val compressor = new SnappyCompressor
-
-  /**
-   * Start of new block notification.
-   *
-   * @param blockId file name of the carbondata file
-   */
-  override def onBlockStart(blockId: String): Unit = {
-    currentBlockId = blockId
-  }
-
-  /**
-   * End of block notification
-   */
-  override def onBlockEnd(blockId: String): Unit = {
-
-  }
-
-  /**
-   * Start of new blocklet notification.
-   *
-   * @param blockletId sequence number of blocklet in the block
-   */
-  override def onBlockletStart(blockletId: Int): Unit = {
-
-  }
-
-  /**
-   * End of blocklet notification
-   *
-   * @param blockletId sequence number of blocklet in the block
-   */
-  override def onBlockletEnd(blockletId: Int): Unit = {
-    val sorted = blockletList
-      .sortWith((l, r) => ByteUtil.UnsafeComparer.INSTANCE.compareTo(l._1, r._1) <= 0)
-    var oldValue: (Array[Byte], Seq[Seq[Int]], Seq[Int]) = null
-    var addedLast: Boolean = false
-    val blockletListUpdated = new ArrayBuffer[(Array[Byte], Seq[Seq[Int]], Seq[Int])]()
-    // Merge all same column values to single row.
-    sorted.foreach { f =>
-      if (oldValue != null) {
-        if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(f._1, oldValue._1) == 0) {
-          oldValue = (oldValue._1, oldValue._2 ++ Seq(f._2), oldValue._3 ++ f._3)
-          addedLast = false
-        } else {
-          blockletListUpdated += oldValue
-          oldValue = (f._1, Seq(f._2), f._3)
-          addedLast = true
-        }
-      } else {
-        oldValue = (f._1, Seq(f._2), f._3)
-        addedLast = false
-      }
-    }
-    if (!addedLast && oldValue != null) {
-      blockletListUpdated += oldValue
-    }
-
-    val out = new ByteOutputStream()
-    val outStream = new ObjectOutputStream(out)
-    outStream.writeObject(blockletListUpdated)
-    outStream.close()
-    val bytes = compressor.compressByte(out.getBytes)
-    stream.write(bytes)
-    maxMin +=
-    ((currentBlockId + "", blockletId, (blockletListUpdated.head._1, blockletListUpdated.last
-      ._1), position, bytes.length))
-    position += bytes.length
-    blockletList.clear()
-  }
-
-  /**
-   * Add the column pages row to the datamap, order of pages is same as `indexColumns` in
-   * DataMapMeta returned in IndexDataMapFactory.
-   *
-   * Implementation should copy the content of `pages` as needed, because `pages` memory
-   * may be freed after this method returns, if using unsafe column page.
-   */
-  override def onPageAdded(blockletId: Int,
-      pageId: Int,
-      pages: Array[ColumnPage]): Unit = {
-    val size = pages(0).getPageSize
-    val list = new ArrayBuffer[(Array[Byte], Int)]()
-    var i = 0
-    while (i < size) {
-      val bytes = pages(0).getBytes(i)
-      val newBytes = new Array[Byte](bytes.length - 2)
-      System.arraycopy(bytes, 2, newBytes, 0, newBytes.length)
-      list += ((newBytes, i))
-      i = i + 1
-    }
-    // Sort based on the column data in order to create index.
-    val sorted = list
-      .sortWith((l, r) => ByteUtil.UnsafeComparer.INSTANCE.compareTo(l._1, r._1) <= 0)
-    var oldValue: (Array[Byte], Seq[Int], Seq[Int]) = null
-    var addedLast: Boolean = false
-    // Merge all same column values to single row.
-    sorted.foreach { f =>
-      if (oldValue != null) {
-        if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(f._1, oldValue._1) == 0) {
-          oldValue = (oldValue._1, oldValue._2 ++ Seq(f._2), oldValue._3)
-          addedLast = false
-        } else {
-          blockletList += oldValue
-          oldValue = (f._1, Seq(f._2), Seq(pageId))
-          addedLast = true
-        }
-      } else {
-        oldValue = (f._1, Seq(f._2), Seq(pageId))
-        addedLast = false
-      }
-    }
-    if (!addedLast && oldValue != null) {
-      blockletList += oldValue
-    }
-  }
-
-
-  /**
-   * This is called during closing of writer.So after this call no more data will be sent to this
-   * class.
-   */
-  override def finish(): Unit = {
-    val out = new ByteOutputStream()
-    val outStream = new ObjectOutputStream(out)
-    outStream.writeObject(maxMin)
-    outStream.close()
-    val bytes = compressor.compressByte(out.getBytes)
-    stream.write(bytes)
-    stream.writeInt(bytes.length)
-    stream.close()
-    commitFile(fgwritepath)
-  }
-}
-
-class FGIndexDataMapTestCase extends QueryTest with BeforeAndAfterAll {
-
-  val file2 = resourcesPath + "/compaction/fil2.csv"
-
-  override protected def beforeAll(): Unit = {
-    //n should be about 5000000 of reset if size is default 1024
-    val n = 150000
-    CompactionSupportGlobalSortBigFileTest.createFile(file2, n * 4, n)
-    sql("DROP TABLE IF EXISTS normal_test")
-    sql(
-      """
-        | CREATE TABLE normal_test(id INT, name STRING, city STRING, age INT)
-        | STORED BY 'org.apache.carbondata.format'
-        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
-      """.stripMargin)
-    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE normal_test OPTIONS('header'='false')")
-  }
-
-  test("test fg datamap") {
-    sql("DROP TABLE IF EXISTS datamap_test")
-    sql(
-      """
-        | CREATE TABLE datamap_test(id INT, name STRING, city STRING, age INT)
-        | STORED BY 'org.apache.carbondata.format'
-        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
-      """.stripMargin)
-    val table = CarbonMetadata.getInstance().getCarbonTable("default_datamap_test")
-    // register datamap writer
-    sql(
-      s"""
-         | CREATE DATAMAP ggdatamap ON TABLE datamap_test
-         | USING '${classOf[FGIndexDataMapFactory].getName}'
-         | DMPROPERTIES('indexcolumns'='name')
-       """.stripMargin)
-    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')")
-    checkAnswer(sql("select * from datamap_test where name='n502670'"),
-      sql("select * from normal_test where name='n502670'"))
-  }
-
-  test("test fg datamap with 2 datamaps ") {
-    sql("DROP TABLE IF EXISTS datamap_test")
-    sql(
-      """
-        | CREATE TABLE datamap_test(id INT, name STRING, city STRING, age INT)
-        | STORED BY 'org.apache.carbondata.format'
-        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
-      """.stripMargin)
-    val table = CarbonMetadata.getInstance().getCarbonTable("default_datamap_test")
-    // register datamap writer
-    sql(
-      s"""
-         | CREATE DATAMAP ggdatamap1 ON TABLE datamap_test
-         | USING '${classOf[FGIndexDataMapFactory].getName}'
-         | DMPROPERTIES('indexcolumns'='name')
-       """.stripMargin)
-    sql(
-      s"""
-         | CREATE DATAMAP ggdatamap2 ON TABLE datamap_test
-         | USING '${classOf[FGIndexDataMapFactory].getName}'
-         | DMPROPERTIES('indexcolumns'='city')
-       """.stripMargin)
-    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')")
-    checkAnswer(sql("select * from datamap_test where name='n502670' and city='c2670'"),
-      sql("select * from normal_test where name='n502670' and city='c2670'"))
-  }
-
-  override protected def afterAll(): Unit = {
-    CompactionSupportGlobalSortBigFileTest.deleteFile(file2)
-    sql("DROP TABLE IF EXISTS normal_test")
-    sql("DROP TABLE IF EXISTS datamap_test")
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fc2a7eb3/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/IndexDataMapWriterSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/IndexDataMapWriterSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/IndexDataMapWriterSuite.scala
deleted file mode 100644
index 795ef6a..0000000
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/IndexDataMapWriterSuite.scala
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.testsuite.datamap
-
-import java.util
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.sql.test.util.QueryTest
-import org.apache.spark.sql.{DataFrame, SaveMode}
-import org.scalatest.BeforeAndAfterAll
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager, Segment}
-import org.apache.carbondata.core.datamap.dev.{AbstractDataMapWriter}
-import org.apache.carbondata.core.datamap.dev.AbstractDataMapWriter
-import org.apache.carbondata.core.datamap.dev.cgdatamap.{AbstractCoarseGrainIndexDataMap, AbstractCoarseGrainIndexDataMapFactory}
-import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta}
-import org.apache.carbondata.core.datastore.page.ColumnPage
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
-import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
-import org.apache.carbondata.core.scan.filter.intf.ExpressionType
-import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.events.Event
-
-class C2IndexDataMapFactory() extends AbstractCoarseGrainIndexDataMapFactory {
-
-  var identifier: AbsoluteTableIdentifier = _
-
-  override def init(identifier: AbsoluteTableIdentifier,
-      dataMapSchema: DataMapSchema): Unit = {
-    this.identifier = identifier
-  }
-
-  override def fireEvent(event: Event): Unit = ???
-
-  override def clear(segment: Segment): Unit = {}
-
-  override def clear(): Unit = {}
-
-  override def getDataMaps(distributable: DataMapDistributable): util.List[AbstractCoarseGrainIndexDataMap] = ???
-
-  override def getDataMaps(segment: Segment): util.List[AbstractCoarseGrainIndexDataMap] = ???
-
-  override def createWriter(segment: Segment, dataWritePath: String): AbstractDataMapWriter =
-    IndexDataMapWriterSuite.dataMapWriterC2Mock(identifier, segment, dataWritePath)
-
-  override def getMeta: DataMapMeta = new DataMapMeta(List("c2").asJava, List(ExpressionType.EQUALS).asJava)
-
-  /**
-   * Get all distributable objects of a segmentid
-   *
-   * @return
-   */
-  override def toDistributable(segmentId: Segment): util.List[DataMapDistributable] = {
-    ???
-  }
-
-}
-
-class IndexDataMapWriterSuite extends QueryTest with BeforeAndAfterAll {
-  def buildTestData(numRows: Int): DataFrame = {
-    import sqlContext.implicits._
-    sqlContext.sparkContext.parallelize(1 to numRows, 1)
-      .map(x => ("a" + x, "b", x))
-      .toDF("c1", "c2", "c3")
-  }
-
-  def dropTable(): Unit = {
-    sql("DROP TABLE IF EXISTS carbon1")
-    sql("DROP TABLE IF EXISTS carbon2")
-  }
-
-  override def beforeAll {
-    dropTable()
-  }
-
-  test("test write datamap 2 pages") {
-    sql(s"CREATE TABLE carbon1(c1 STRING, c2 STRING, c3 INT) STORED BY 'org.apache.carbondata.format'")
-    // register datamap writer
-    sql(s"CREATE DATAMAP test ON TABLE carbon1 USING '${classOf[C2IndexDataMapFactory].getName}'")
-    val df = buildTestData(33000)
-
-    // save dataframe to carbon file
-    df.write
-      .format("carbondata")
-      .option("tableName", "carbon1")
-      .option("tempCSV", "false")
-      .option("sort_columns","c1")
-      .mode(SaveMode.Overwrite)
-      .save()
-
-    assert(IndexDataMapWriterSuite.callbackSeq.head.contains("block start"))
-    assert(IndexDataMapWriterSuite.callbackSeq.last.contains("block end"))
-    assert(
-      IndexDataMapWriterSuite.callbackSeq.slice(1, IndexDataMapWriterSuite.callbackSeq.length - 1) == Seq(
-        "blocklet start 0",
-        "add page data: blocklet 0, page 0",
-        "add page data: blocklet 0, page 1",
-        "blocklet end: 0"
-      ))
-    IndexDataMapWriterSuite.callbackSeq = Seq()
-  }
-
-  test("test write datamap 2 blocklet") {
-    sql(s"CREATE TABLE carbon2(c1 STRING, c2 STRING, c3 INT) STORED BY 'org.apache.carbondata.format'")
-    sql(s"CREATE DATAMAP test ON TABLE carbon2 USING '${classOf[C2IndexDataMapFactory].getName}'")
-
-    CarbonProperties.getInstance()
-      .addProperty("carbon.blockletgroup.size.in.mb", "1")
-    CarbonProperties.getInstance()
-      .addProperty("carbon.number.of.cores.while.loading",
-        CarbonCommonConstants.NUM_CORES_DEFAULT_VAL)
-
-    val df = buildTestData(300000)
-
-    // save dataframe to carbon file
-    df.write
-      .format("carbondata")
-      .option("tableName", "carbon2")
-      .option("tempCSV", "false")
-      .option("sort_columns","c1")
-      .option("SORT_SCOPE","GLOBAL_SORT")
-      .mode(SaveMode.Overwrite)
-      .save()
-
-    assert(IndexDataMapWriterSuite.callbackSeq.head.contains("block start"))
-    assert(IndexDataMapWriterSuite.callbackSeq.last.contains("block end"))
-    // corrected test case the min "carbon.blockletgroup.size.in.mb" size could not be less than
-    // 64 MB
-    assert(
-      IndexDataMapWriterSuite.callbackSeq.slice(1, IndexDataMapWriterSuite.callbackSeq.length - 1) == Seq(
-        "blocklet start 0",
-        "add page data: blocklet 0, page 0",
-        "add page data: blocklet 0, page 1",
-        "add page data: blocklet 0, page 2",
-        "add page data: blocklet 0, page 3",
-        "add page data: blocklet 0, page 4",
-        "add page data: blocklet 0, page 5",
-        "add page data: blocklet 0, page 6",
-        "add page data: blocklet 0, page 7",
-        "add page data: blocklet 0, page 8",
-        "add page data: blocklet 0, page 9",
-        "blocklet end: 0"
-      ))
-    IndexDataMapWriterSuite.callbackSeq = Seq()
-  }
-
-  override def afterAll {
-    dropTable()
-  }
-}
-
-object IndexDataMapWriterSuite {
-
-  var callbackSeq: Seq[String] = Seq[String]()
-
-  def dataMapWriterC2Mock(identifier: AbsoluteTableIdentifier, segment: Segment,
-      dataWritePath: String) =
-    new AbstractDataMapWriter(identifier, segment, dataWritePath) {
-
-    override def onPageAdded(
-        blockletId: Int,
-        pageId: Int,
-        pages: Array[ColumnPage]): Unit = {
-      assert(pages.length == 1)
-      assert(pages(0).getDataType == DataTypes.STRING)
-      val bytes: Array[Byte] = pages(0).getByteArrayPage()(0)
-      assert(bytes.sameElements(Seq(0, 1, 'b'.toByte)))
-      callbackSeq :+= s"add page data: blocklet $blockletId, page $pageId"
-    }
-
-    override def onBlockletEnd(blockletId: Int): Unit = {
-      callbackSeq :+= s"blocklet end: $blockletId"
-    }
-
-    override def onBlockEnd(blockId: String): Unit = {
-      callbackSeq :+= s"block end $blockId"
-    }
-
-    override def onBlockletStart(blockletId: Int): Unit = {
-      callbackSeq :+= s"blocklet start $blockletId"
-    }
-
-    /**
-     * Start of new block notification.
-     *
-     * @param blockId file name of the carbondata file
-     */
-    override def onBlockStart(blockId: String) = {
-      callbackSeq :+= s"block start $blockId"
-    }
-
-    /**
-     * This is called during closing of writer.So after this call no more data will be sent to this
-     * class.
-     */
-    override def finish() = {
-
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fc2a7eb3/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
new file mode 100644
index 0000000..eaa2ae7
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.datamap
+
+import java.io.{File, FilenameFilter}
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.common.exceptions.MetadataProcessException
+import org.apache.carbondata.common.exceptions.sql.{MalformedDataMapCommandException, NoSuchDataMapException}
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+class TestDataMapCommand extends QueryTest with BeforeAndAfterAll {
+
+  val testData = s"$resourcesPath/sample.csv"
+
+  override def beforeAll {
+    sql("drop table if exists datamaptest")
+    sql("drop table if exists datamapshowtest")
+    sql("drop table if exists uniqdata")
+    sql("create table datamaptest (a string, b string, c string) stored by 'carbondata'")
+  }
+
+  val newClass = "org.apache.spark.sql.CarbonSource"
+
+  test("test datamap create: don't support using non-exist class") {
+    intercept[MetadataProcessException] {
+      sql(s"CREATE DATAMAP datamap1 ON TABLE datamaptest USING '$newClass'")
+    }
+  }
+
+  test("test datamap create with dmproperties: don't support using non-exist class") {
+    intercept[MetadataProcessException] {
+      sql(s"CREATE DATAMAP datamap2 ON TABLE datamaptest USING '$newClass' DMPROPERTIES('key'='value')")
+    }
+  }
+
+  test("test datamap create with existing name: don't support using non-exist class") {
+    intercept[MetadataProcessException] {
+      sql(
+        s"CREATE DATAMAP datamap2 ON TABLE datamaptest USING '$newClass' DMPROPERTIES('key'='value')")
+    }
+  }
+
+  test("test datamap create with preagg") {
+    sql("drop datamap if exists datamap3 on table datamaptest")
+    sql(
+      "create datamap datamap3 on table datamaptest using 'preaggregate' as select count(a) from datamaptest")
+    val table = CarbonMetadata.getInstance().getCarbonTable("default", "datamaptest")
+    assert(table != null)
+    val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList
+    assert(dataMapSchemaList.size() == 1)
+    assert(dataMapSchemaList.get(0).getDataMapName.equals("datamap3"))
+    assert(dataMapSchemaList.get(0).getChildSchema.getTableName.equals("datamaptest_datamap3"))
+  }
+
+  test("check hivemetastore after drop datamap") {
+    try {
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE,
+          "true")
+      sql("drop table if exists hiveMetaStoreTable")
+      sql("create table hiveMetaStoreTable (a string, b string, c string) stored by 'carbondata'")
+
+      sql(
+        "create datamap datamap_hiveMetaStoreTable on table hiveMetaStoreTable using 'preaggregate' as select count(a) from hiveMetaStoreTable")
+      checkExistence(sql("show datamap on table hiveMetaStoreTable"), true, "datamap_hiveMetaStoreTable")
+
+      sql("drop datamap datamap_hiveMetaStoreTable on table hiveMetaStoreTable")
+      checkExistence(sql("show datamap on table hiveMetaStoreTable"), false, "datamap_hiveMetaStoreTable")
+
+    } finally {
+      sql("drop table hiveMetaStoreTable")
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE,
+          CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT)
+    }
+  }
+
+  test("drop the table having pre-aggregate") {
+    try {
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE,
+          "true")
+      sql("drop table if exists hiveMetaStoreTable_1")
+      sql("create table hiveMetaStoreTable_1 (a string, b string, c string) stored by 'carbondata'")
+
+      sql(
+        "create datamap datamap_hiveMetaStoreTable_1 on table hiveMetaStoreTable_1 using 'preaggregate' as select count(a) from hiveMetaStoreTable_1")
+
+      checkExistence(sql("show datamap on table hiveMetaStoreTable_1"),
+        true,
+        "datamap_hiveMetaStoreTable_1")
+
+      sql("drop datamap datamap_hiveMetaStoreTable_1 on table hiveMetaStoreTable_1")
+      checkExistence(sql("show datamap on table hiveMetaStoreTable_1"),
+        false,
+        "datamap_hiveMetaStoreTable_1")
+      assert(sql("show datamap on table hiveMetaStoreTable_1").collect().length == 0)
+      sql("drop table hiveMetaStoreTable_1")
+
+      checkExistence(sql("show tables"), false, "datamap_hiveMetaStoreTable_1")
+    }
+    finally {
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE,
+          CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT)
+    }
+  }
+
+  test("test datamap create with preagg with duplicate name") {
+    sql(
+      s"""
+         | CREATE DATAMAP datamap10 ON TABLE datamaptest
+         | USING 'preaggregate'
+         | AS SELECT COUNT(a) FROM datamaptest
+         """.stripMargin)
+    intercept[MalformedDataMapCommandException] {
+      sql(
+        s"""
+           | CREATE DATAMAP datamap10 ON TABLE datamaptest
+           | USING 'preaggregate'
+           | AS SELECT COUNT(a) FROM datamaptest
+         """.stripMargin)
+    }
+    val table = CarbonMetadata.getInstance().getCarbonTable("default", "datamaptest")
+    assert(table != null)
+    val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList
+    assert(dataMapSchemaList.size() == 2)
+  }
+
+  test("test drop non-exist datamap") {
+    intercept[NoSuchDataMapException] {
+      sql("drop datamap nonexist on table datamaptest")
+    }
+    val table = CarbonMetadata.getInstance().getCarbonTable("default", "datamaptest")
+    assert(table != null)
+    val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList
+    assert(dataMapSchemaList.size() == 2)
+  }
+
+  test("test show datamap without preaggregate: don't support using non-exist class") {
+    intercept[MetadataProcessException] {
+      sql("drop table if exists datamapshowtest")
+      sql("create table datamapshowtest (a string, b string, c string) stored by 'carbondata'")
+      sql(s"CREATE DATAMAP datamap1 ON TABLE datamapshowtest USING '$newClass' ")
+      sql(s"CREATE DATAMAP datamap2 ON TABLE datamapshowtest USING '$newClass' ")
+      checkExistence(sql("SHOW DATAMAP ON TABLE datamapshowtest"), true, "datamap1", "datamap2", "(NA)", newClass)
+    }
+  }
+
+  test("test show datamap with preaggregate: don't support using non-exist class") {
+    intercept[MetadataProcessException] {
+      sql("drop table if exists datamapshowtest")
+      sql("create table datamapshowtest (a string, b string, c string) stored by 'carbondata'")
+      sql("create datamap datamap1 on table datamapshowtest using 'preaggregate' as select count(a) from datamapshowtest")
+      sql(s"CREATE DATAMAP datamap2 ON TABLE datamapshowtest USING '$newClass' ")
+      val frame = sql("show datamap on table datamapshowtest")
+      assert(frame.collect().length == 2)
+      checkExistence(frame, true, "datamap1", "datamap2", "(NA)", newClass, "default.datamapshowtest_datamap1")
+    }
+  }
+
+  test("test show datamap with no datamap") {
+    sql("drop table if exists datamapshowtest")
+    sql("create table datamapshowtest (a string, b string, c string) stored by 'carbondata'")
+    assert(sql("show datamap on table datamapshowtest").collect().length == 0)
+  }
+
+  test("test show datamap after dropping datamap: don't support using non-exist class") {
+    intercept[MetadataProcessException] {
+      sql("drop table if exists datamapshowtest")
+      sql("create table datamapshowtest (a string, b string, c string) stored by 'carbondata'")
+      sql("create datamap datamap1 on table datamapshowtest using 'preaggregate' as select count(a) from datamapshowtest")
+      sql(s"CREATE DATAMAP datamap2 ON TABLE datamapshowtest USING '$newClass' ")
+      sql("drop datamap datamap1 on table datamapshowtest")
+      val frame = sql("show datamap on table datamapshowtest")
+      assert(frame.collect().length == 1)
+      checkExistence(frame, true, "datamap2", "(NA)", newClass)
+    }
+  }
+
+  test("test if preaggregate load is successfull for hivemetastore") {
+    try {
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE, "true")
+      sql("DROP TABLE IF EXISTS maintable")
+      sql(
+        """
+          | CREATE TABLE maintable(id int, name string, city string, age int)
+          | STORED BY 'org.apache.carbondata.format'
+        """.stripMargin)
+      sql(
+        s"""create datamap preagg_sum on table maintable using 'preaggregate' as select id,sum(age) from maintable group by id"""
+
+          .stripMargin)
+      sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+      checkAnswer(sql(s"select * from maintable_preagg_sum"),
+        Seq(Row(1, 31), Row(2, 27), Row(3, 70), Row(4, 55)))
+    } finally {
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE,
+          CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT)
+    }
+  }
+
+  test("test preaggregate load for decimal column for hivemetastore") {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE, "true")
+    sql("CREATE TABLE uniqdata(CUST_ID int,CUST_NAME String,ACTIVE_EMUI_VERSION string,DOB timestamp,DOJ timestamp, BIGINT_COLUMN1 bigint,BIGINT_COLUMN2 bigint,DECIMAL_COLUMN1 decimal(30,10),DECIMAL_COLUMN2 decimal(36,10),Double_COLUMN1 double, Double_COLUMN2 double,INTEGER_COLUMN1 int) STORED BY 'org.apache.carbondata.format'")
+    sql("insert into uniqdata select 9000,'CUST_NAME_00000','ACTIVE_EMUI_VERSION_00000','1970-01-01 01:00:03','1970-01-01 02:00:03',123372036854,-223372036854,12345678901.1234000000,22345678901.1234000000,11234567489.7976000000,-11234567489.7976000000,1")
+    sql("create datamap uniqdata_agg on table uniqdata using 'preaggregate' as select min(DECIMAL_COLUMN1) from uniqdata group by DECIMAL_COLUMN1")
+    checkAnswer(sql("select * from uniqdata_uniqdata_agg"), Seq(Row(12345678901.1234000000, 12345678901.1234000000)))
+    sql("drop datamap if exists uniqdata_agg on table uniqdata")
+  }
+
+  test("create pre-agg table with path") {
+    sql("drop table if exists main_preagg")
+    sql("drop table if exists main ")
+    val warehouse = s"$metastoredb/warehouse"
+    val path = warehouse + "/" + System.nanoTime + "_preAggTestPath"
+    sql(
+      s"""
+         | create table main(
+         |     year int,
+         |     month int,
+         |     name string,
+         |     salary int)
+         | stored by 'carbondata'
+         | tblproperties('sort_columns'='month,year,name')
+      """.stripMargin)
+    sql("insert into main select 10,11,'amy',12")
+    sql("insert into main select 10,11,'amy',14")
+    sql(
+      s"""
+         | create datamap preagg
+         | on table main
+         | using 'preaggregate'
+         | dmproperties ('path'='$path')
+         | as select name,avg(salary)
+         |    from main
+         |    group by name
+       """.stripMargin)
+    assertResult(true)(new File(path).exists())
+    assertResult(true)(new File(s"${CarbonTablePath.getSegmentPath(path, "0")}")
+      .list(new FilenameFilter {
+        override def accept(dir: File, name: String): Boolean = {
+          name.contains(CarbonCommonConstants.FACT_FILE_EXT)
+        }
+      }).length > 0)
+    checkAnswer(sql("select name,avg(salary) from main group by name"), Row("amy", 13.0))
+    checkAnswer(sql("select * from main_preagg"), Row("amy", 26, 2))
+    sql("drop datamap preagg on table main")
+    assertResult(false)(new File(path).exists())
+    sql("drop table main")
+  }
+
+  override def afterAll {
+    sql("DROP TABLE IF EXISTS maintable")
+    sql("drop table if exists uniqdata")
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE,
+      CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT)
+    sql("drop table if exists datamaptest")
+    sql("drop table if exists datamapshowtest")
+  }
+}