You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/02/25 12:54:41 UTC

[07/10] carbondata git commit: [CARBONDATA-2189] Add DataMapProvider developer interface

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 04902f9..98c3398 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -42,7 +42,7 @@ import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.exception.InvalidConfigurationException;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
-import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory;
+import org.apache.carbondata.core.indexstore.blockletindex.BlockletIndexDataMapFactory;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
 import org.apache.carbondata.core.metadata.schema.PartitionInfo;
@@ -732,7 +732,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
       DistributableDataMapFormat datamapDstr =
           new DistributableDataMapFormat(absoluteTableIdentifier, dataMapExprWrapper,
               segmentIds, partitionsToPrune,
-              BlockletDataMapFactory.class.getName());
+              BlockletIndexDataMapFactory.class.getName());
       prunedBlocklets = dataMapJob.execute(datamapDstr, resolver);
       // Apply expression on the blocklets.
       prunedBlocklets = dataMapExprWrapper.pruneBlocklets(prunedBlocklets);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
index 0d33797..d84c0e7 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
@@ -265,7 +265,7 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll {
            | GROUP BY dob,name
        """.stripMargin)
     }
-    assert(e.getMessage.contains(s"$timeSeries keyword missing"))
+    assert(e.getMessage.contains("Only 'path' dmproperty is allowed for this datamap"))
     sql("DROP TABLE IF EXISTS maintabletime")
   }
 
@@ -282,7 +282,7 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll {
           | GROUP BY column3,column5,column2
         """.stripMargin)
     }
-    assert(e.getMessage.contains("DataMap class 'abc' not found"))
+    assert(e.getMessage.contains("DataMap 'abc' not found"))
     sql("DROP DATAMAP IF EXISTS agg0 ON TABLE maintable")
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
index f9ac354..a973bfd 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
@@ -293,8 +293,8 @@ test("test PreAggregate table selection with timeseries and normal together") {
        | GROUP BY dob,name
        """.stripMargin)
 
-    val df = sql("SELECT timeseries(dob,'year') FROM maintabletime GROUP BY timeseries(dob,'year')")
-    preAggTableValidator(df.queryExecution.analyzed, "maintabletime_agg1_year")
+  val df = sql("SELECT timeseries(dob,'year') FROM maintabletime GROUP BY timeseries(dob,'year')")
+  preAggTableValidator(df.queryExecution.analyzed, "maintabletime_agg1_year")
   sql("DROP TABLE IF EXISTS maintabletime")
 
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala
index 49cabea..0868eed 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala
@@ -200,7 +200,7 @@ class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll {
           | GROUP BY dataTime
         """.stripMargin)
     }
-    assert(e.getMessage.equals("DataMap class 'abc' not found"))
+    assert(e.getMessage.equals("DataMap 'abc' not found"))
   }
 
   test("test timeseries create table: USING and catch MalformedCarbonCommandException") {
@@ -215,7 +215,7 @@ class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll {
           | GROUP BY dataTime
         """.stripMargin)
     }
-    assert(e.getMessage.equals("DataMap class 'abc' not found"))
+    assert(e.getMessage.equals("DataMap 'abc' not found"))
   }
 
   test("test timeseries create table: Only one granularity level can be defined 1") {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
deleted file mode 100644
index d4c49d2..0000000
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
+++ /dev/null
@@ -1,379 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.spark.testsuite.datamap
-
-import java.io.{ByteArrayInputStream, DataOutputStream, ObjectInputStream, ObjectOutputStream}
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
-
-import com.sun.xml.internal.messaging.saaj.util.ByteOutputStream
-import org.apache.spark.sql.test.util.QueryTest
-import org.scalatest.BeforeAndAfterAll
-
-import org.apache.carbondata.core.datamap.dev.cgdatamap.{AbstractCoarseGrainDataMap, AbstractCoarseGrainDataMapFactory}
-import org.apache.carbondata.core.datamap.dev.{AbstractDataMapWriter, DataMapModel}
-import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager}
-import org.apache.carbondata.core.datastore.FileReader
-import org.apache.carbondata.core.datastore.block.SegmentProperties
-import org.apache.carbondata.core.datastore.compression.SnappyCompressor
-import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
-import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.datastore.page.ColumnPage
-import org.apache.carbondata.core.indexstore.Blocklet
-import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable
-import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
-import org.apache.carbondata.core.scan.expression.Expression
-import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression
-import org.apache.carbondata.core.scan.filter.intf.ExpressionType
-import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf
-import org.apache.carbondata.core.util.ByteUtil
-import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.events.Event
-import org.apache.carbondata.spark.testsuite.datacompaction.CompactionSupportGlobalSortBigFileTest
-
-class CGDataMapFactory extends AbstractCoarseGrainDataMapFactory {
-  var identifier: AbsoluteTableIdentifier = _
-  var dataMapSchema: DataMapSchema = _
-
-  /**
-   * Initialization of Datamap factory with the identifier and datamap name
-   */
-  override def init(identifier: AbsoluteTableIdentifier, dataMapSchema: DataMapSchema): Unit = {
-    this.identifier = identifier
-    this.dataMapSchema = dataMapSchema
-  }
-
-  /**
-   * Return a new write for this datamap
-   */
-  override def createWriter(segmentId: String, dataWritePath: String): AbstractDataMapWriter = {
-    new CGDataMapWriter(identifier, segmentId, dataWritePath, dataMapSchema)
-  }
-
-  /**
-   * Get the datamap for segmentid
-   */
-  override def getDataMaps(segmentId: String) = {
-    val file = FileFactory.getCarbonFile(CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId))
-
-    val files = file.listFiles(new CarbonFileFilter {
-      override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap")
-    })
-    files.map {f =>
-      val dataMap: AbstractCoarseGrainDataMap = new CGDataMap()
-      dataMap.init(new DataMapModel(f.getCanonicalPath))
-      dataMap
-    }.toList.asJava
-  }
-
-
-  /**
-   * Get datamaps for distributable object.
-   */
-  override def getDataMaps(
-      distributable: DataMapDistributable): java.util.List[AbstractCoarseGrainDataMap] = {
-    val mapDistributable = distributable.asInstanceOf[BlockletDataMapDistributable]
-    val dataMap: AbstractCoarseGrainDataMap = new CGDataMap()
-    dataMap.init(new DataMapModel(mapDistributable.getFilePath))
-    Seq(dataMap).asJava
-  }
-
-  /**
-   *
-   * @param event
-   */
-  override def fireEvent(event: Event): Unit = {
-    ???
-  }
-
-  /**
-   * Get all distributable objects of a segmentid
-   *
-   * @return
-   */
-  override def toDistributable(segmentId: String) = {
-    val file = FileFactory.getCarbonFile(CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId))
-
-    val files = file.listFiles(new CarbonFileFilter {
-      override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap")
-    })
-    files.map { f =>
-      val d:DataMapDistributable = new BlockletDataMapDistributable(f.getCanonicalPath)
-      d
-    }.toList.asJava
-  }
-
-
-  /**
-   * Clears datamap of the segment
-   */
-  override def clear(segmentId: String): Unit = {
-
-  }
-
-  /**
-   * Clear all datamaps from memory
-   */
-  override def clear(): Unit = {
-
-  }
-
-  /**
-   * Return metadata of this datamap
-   */
-  override def getMeta: DataMapMeta = {
-    new DataMapMeta(dataMapSchema.getProperties.get("indexcolumns").split(",").toList.asJava,
-      List(ExpressionType.EQUALS, ExpressionType.IN).asJava)
-  }
-}
-
-class CGDataMap extends AbstractCoarseGrainDataMap {
-
-  var maxMin: ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]))] = _
-  var FileReader: FileReader = _
-  var filePath: String = _
-  val compressor = new SnappyCompressor
-
-  /**
-   * It is called to load the data map to memory or to initialize it.
-   */
-  override def init(dataMapModel: DataMapModel): Unit = {
-    this.filePath = dataMapModel.getFilePath
-    val size = FileFactory.getCarbonFile(filePath).getSize
-    FileReader = FileFactory.getFileHolder(FileFactory.getFileType(filePath))
-    val footerLen = FileReader.readInt(filePath, size-4)
-    val bytes = FileReader.readByteArray(filePath, size-footerLen-4, footerLen)
-    val in = new ByteArrayInputStream(compressor.unCompressByte(bytes))
-    val obj = new ObjectInputStream(in)
-    maxMin = obj.readObject().asInstanceOf[ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]))]]
-  }
-
-  /**
-   * Prune the datamap with filter expression. It returns the list of
-   * blocklets where these filters can exist.
-   *
-   * @param filterExp
-   * @return
-   */
-  override def prune(
-      filterExp: FilterResolverIntf,
-      segmentProperties: SegmentProperties,
-      partitions: java.util.List[String]): java.util.List[Blocklet] = {
-    val buffer: ArrayBuffer[Expression] = new ArrayBuffer[Expression]()
-    val expression = filterExp.getFilterExpression
-    getEqualToExpression(expression, buffer)
-    val value = buffer.map { f =>
-      f.getChildren.get(1).evaluate(null).getString
-    }
-    val meta = findMeta(value(0).getBytes)
-    meta.map { f=>
-      new Blocklet(f._1, f._2+"")
-    }.asJava
-  }
-
-
-  private def findMeta(value: Array[Byte]) = {
-    val tuples = maxMin.filter { f =>
-      ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._3._1) <= 0 &&
-      ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._3._2) >= 0
-    }
-    tuples
-  }
-
-  private def getEqualToExpression(expression: Expression, buffer: ArrayBuffer[Expression]): Unit = {
-    if (expression.isInstanceOf[EqualToExpression]) {
-      buffer += expression
-    } else {
-      if (expression.getChildren != null) {
-        expression.getChildren.asScala.map { f =>
-          if (f.isInstanceOf[EqualToExpression]) {
-            buffer += f
-          }
-          getEqualToExpression(f, buffer)
-        }
-      }
-    }
-  }
-
-  /**
-   * Clear complete index table and release memory.
-   */
-  override def clear() = {
-    ???
-  }
-
-  override def isScanRequired(filterExp: FilterResolverIntf): Boolean = ???
-}
-
-class CGDataMapWriter(identifier: AbsoluteTableIdentifier,
-    segmentId: String,
-    dataWritePath: String,
-    dataMapSchema: DataMapSchema)
-  extends AbstractDataMapWriter(identifier, segmentId, dataWritePath) {
-
-  var currentBlockId: String = null
-  val cgwritepath = dataWritePath + "/" +
-                    dataMapSchema.getDataMapName + System.nanoTime() + ".datamap"
-  lazy val stream: DataOutputStream = FileFactory
-    .getDataOutputStream(cgwritepath, FileFactory.getFileType(cgwritepath))
-  val blockletList = new ArrayBuffer[Array[Byte]]()
-  val maxMin = new ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]))]()
-  val compressor = new SnappyCompressor
-
-  /**
-   * Start of new block notification.
-   *
-   * @param blockId file name of the carbondata file
-   */
-  override def onBlockStart(blockId: String): Unit = {
-    currentBlockId = blockId
-  }
-
-  /**
-   * End of block notification
-   */
-  override def onBlockEnd(blockId: String): Unit = {
-
-  }
-
-  /**
-   * Start of new blocklet notification.
-   *
-   * @param blockletId sequence number of blocklet in the block
-   */
-  override def onBlockletStart(blockletId: Int): Unit = {
-
-  }
-
-  /**
-   * End of blocklet notification
-   *
-   * @param blockletId sequence number of blocklet in the block
-   */
-  override def onBlockletEnd(blockletId: Int): Unit = {
-    val sorted = blockletList
-      .sortWith((l, r) => ByteUtil.UnsafeComparer.INSTANCE.compareTo(l, r) <= 0)
-    maxMin +=
-    ((currentBlockId+"", blockletId, (sorted.last, sorted.head)))
-    blockletList.clear()
-  }
-
-  /**
-   * Add the column pages row to the datamap, order of pages is same as `indexColumns` in
-   * DataMapMeta returned in DataMapFactory.
-   *
-   * Implementation should copy the content of `pages` as needed, because `pages` memory
-   * may be freed after this method returns, if using unsafe column page.
-   */
-  override def onPageAdded(blockletId: Int,
-      pageId: Int,
-      pages: Array[ColumnPage]): Unit = {
-    val size = pages(0).getPageSize
-    val list = new ArrayBuffer[Array[Byte]]()
-    var i = 0
-    while (i < size) {
-      val bytes = pages(0).getBytes(i)
-      val newBytes = new Array[Byte](bytes.length - 2)
-      System.arraycopy(bytes, 2, newBytes, 0, newBytes.length)
-      list += newBytes
-      i = i + 1
-    }
-    // Sort based on the column data in order to create index.
-    val sorted = list
-      .sortWith((l, r) => ByteUtil.UnsafeComparer.INSTANCE.compareTo(l, r) <= 0)
-    blockletList += sorted.head
-    blockletList += sorted.last
-  }
-
-
-  /**
-   * This is called during closing of writer.So after this call no more data will be sent to this
-   * class.
-   */
-  override def finish(): Unit = {
-    val out = new ByteOutputStream()
-    val outStream = new ObjectOutputStream(out)
-    outStream.writeObject(maxMin)
-    outStream.close()
-    val bytes = compressor.compressByte(out.getBytes)
-    stream.write(bytes)
-    stream.writeInt(bytes.length)
-    stream.close()
-    commitFile(cgwritepath)
-  }
-
-
-}
-
-class CGDataMapTestCase extends QueryTest with BeforeAndAfterAll {
-
-  val file2 = resourcesPath + "/compaction/fil2.csv"
-  override protected def beforeAll(): Unit = {
-    //n should be about 5000000 of reset if size is default 1024
-    val n = 150000
-    CompactionSupportGlobalSortBigFileTest.createFile(file2, n * 4, n)
-    sql("DROP TABLE IF EXISTS normal_test")
-    sql(
-      """
-        | CREATE TABLE normal_test(id INT, name STRING, city STRING, age INT)
-        | STORED BY 'org.apache.carbondata.format'
-        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
-      """.stripMargin)
-    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE normal_test OPTIONS('header'='false')")
-  }
-
-  test("test cg datamap") {
-    sql("DROP TABLE IF EXISTS datamap_test_cg")
-    sql(
-      """
-        | CREATE TABLE datamap_test_cg(id INT, name STRING, city STRING, age INT)
-        | STORED BY 'org.apache.carbondata.format'
-        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
-      """.stripMargin)
-    val table = CarbonMetadata.getInstance().getCarbonTable("default_datamap_test_cg")
-    // register datamap writer
-    sql(s"create datamap cgdatamap on table datamap_test_cg using '${classOf[CGDataMapFactory].getName}' DMPROPERTIES('indexcolumns'='name')")
-    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test_cg OPTIONS('header'='false')")
-    checkAnswer(sql("select * from datamap_test_cg where name='n502670'"),
-      sql("select * from normal_test where name='n502670'"))
-  }
-
-  test("test cg datamap with 2 datamaps ") {
-    sql("DROP TABLE IF EXISTS datamap_test")
-    sql(
-      """
-        | CREATE TABLE datamap_test(id INT, name STRING, city STRING, age INT)
-        | STORED BY 'org.apache.carbondata.format'
-        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
-      """.stripMargin)
-    val table = CarbonMetadata.getInstance().getCarbonTable("default_datamap_test")
-    // register datamap writer
-    sql(s"create datamap ggdatamap1 on table datamap_test using '${classOf[CGDataMapFactory].getName}' DMPROPERTIES('indexcolumns'='name')")
-    sql(s"create datamap ggdatamap2 on table datamap_test using '${classOf[CGDataMapFactory].getName}' DMPROPERTIES('indexcolumns'='city')")
-    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')")
-    checkAnswer(sql("select * from datamap_test where name='n502670' and city='c2670'"),
-      sql("select * from normal_test where name='n502670' and city='c2670'"))
-  }
-
-  override protected def afterAll(): Unit = {
-    CompactionSupportGlobalSortBigFileTest.deleteFile(file2)
-    sql("DROP TABLE IF EXISTS normal_test")
-    sql("DROP TABLE IF EXISTS datamap_test_cg")
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGIndexDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGIndexDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGIndexDataMapTestCase.scala
new file mode 100644
index 0000000..5e944fb
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGIndexDataMapTestCase.scala
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.testsuite.datamap
+
+import java.io.{ByteArrayInputStream, DataOutputStream, ObjectInputStream, ObjectOutputStream}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+import com.sun.xml.internal.messaging.saaj.util.ByteOutputStream
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager}
+import org.apache.carbondata.core.datamap.dev.{AbstractDataMapWriter, DataMapModel}
+import org.apache.carbondata.core.datamap.dev.cgdatamap.{AbstractCoarseGrainIndexDataMap, AbstractCoarseGrainIndexDataMapFactory}
+import org.apache.carbondata.core.datastore.FileReader
+import org.apache.carbondata.core.datastore.block.SegmentProperties
+import org.apache.carbondata.core.datastore.compression.SnappyCompressor
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.datastore.page.ColumnPage
+import org.apache.carbondata.core.indexstore.Blocklet
+import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
+import org.apache.carbondata.core.scan.expression.Expression
+import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression
+import org.apache.carbondata.core.scan.filter.intf.ExpressionType
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf
+import org.apache.carbondata.core.util.ByteUtil
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.events.Event
+import org.apache.carbondata.spark.testsuite.datacompaction.CompactionSupportGlobalSortBigFileTest
+
+class CGIndexDataMapFactory extends AbstractCoarseGrainIndexDataMapFactory {
+  var identifier: AbsoluteTableIdentifier = _
+  var dataMapSchema: DataMapSchema = _
+
+  /**
+   * Initialization of Datamap factory with the identifier and datamap name
+   */
+  override def init(identifier: AbsoluteTableIdentifier, dataMapSchema: DataMapSchema): Unit = {
+    this.identifier = identifier
+    this.dataMapSchema = dataMapSchema
+  }
+
+  /**
+   * Return a new write for this datamap
+   */
+  override def createWriter(segmentId: String, dataWritePath: String): AbstractDataMapWriter = {
+    new CGDataMapWriter(identifier, segmentId, dataWritePath, dataMapSchema)
+  }
+
+  /**
+   * Get the datamap for segmentid
+   */
+  override def getDataMaps(segmentId: String) = {
+    val file = FileFactory.getCarbonFile(CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId))
+
+    val files = file.listFiles(new CarbonFileFilter {
+      override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap")
+    })
+    files.map {f =>
+      val dataMap: AbstractCoarseGrainIndexDataMap = new CGIndexDataMap()
+      dataMap.init(new DataMapModel(f.getCanonicalPath))
+      dataMap
+    }.toList.asJava
+  }
+
+
+  /**
+   * Get datamaps for distributable object.
+   */
+  override def getDataMaps(
+      distributable: DataMapDistributable): java.util.List[AbstractCoarseGrainIndexDataMap] = {
+    val mapDistributable = distributable.asInstanceOf[BlockletDataMapDistributable]
+    val dataMap: AbstractCoarseGrainIndexDataMap = new CGIndexDataMap()
+    dataMap.init(new DataMapModel(mapDistributable.getFilePath))
+    Seq(dataMap).asJava
+  }
+
+  /**
+   *
+   * @param event
+   */
+  override def fireEvent(event: Event): Unit = {
+    ???
+  }
+
+  /**
+   * Get all distributable objects of a segmentid
+   *
+   * @return
+   */
+  override def toDistributable(segmentId: String) = {
+    val file = FileFactory.getCarbonFile(CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId))
+
+    val files = file.listFiles(new CarbonFileFilter {
+      override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap")
+    })
+    files.map { f =>
+      val d:DataMapDistributable = new BlockletDataMapDistributable(f.getCanonicalPath)
+      d
+    }.toList.asJava
+  }
+
+
+  /**
+   * Clears datamap of the segment
+   */
+  override def clear(segmentId: String): Unit = {
+
+  }
+
+  /**
+   * Clear all datamaps from memory
+   */
+  override def clear(): Unit = {
+
+  }
+
+  /**
+   * Return metadata of this datamap
+   */
+  override def getMeta: DataMapMeta = {
+    new DataMapMeta(dataMapSchema.getProperties.get("indexcolumns").split(",").toList.asJava,
+      List(ExpressionType.EQUALS, ExpressionType.IN).asJava)
+  }
+}
+
+class CGIndexDataMap extends AbstractCoarseGrainIndexDataMap {
+
+  var maxMin: ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]))] = _
+  var FileReader: FileReader = _
+  var filePath: String = _
+  val compressor = new SnappyCompressor
+
+  /**
+   * It is called to load the data map to memory or to initialize it.
+   */
+  override def init(dataMapModel: DataMapModel): Unit = {
+    this.filePath = dataMapModel.getFilePath
+    val size = FileFactory.getCarbonFile(filePath).getSize
+    FileReader = FileFactory.getFileHolder(FileFactory.getFileType(filePath))
+    val footerLen = FileReader.readInt(filePath, size-4)
+    val bytes = FileReader.readByteArray(filePath, size-footerLen-4, footerLen)
+    val in = new ByteArrayInputStream(compressor.unCompressByte(bytes))
+    val obj = new ObjectInputStream(in)
+    maxMin = obj.readObject().asInstanceOf[ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]))]]
+  }
+
+  /**
+   * Prune the datamap with filter expression. It returns the list of
+   * blocklets where these filters can exist.
+   *
+   * @param filterExp
+   * @return
+   */
+  override def prune(
+      filterExp: FilterResolverIntf,
+      segmentProperties: SegmentProperties,
+      partitions: java.util.List[String]): java.util.List[Blocklet] = {
+    val buffer: ArrayBuffer[Expression] = new ArrayBuffer[Expression]()
+    val expression = filterExp.getFilterExpression
+    getEqualToExpression(expression, buffer)
+    val value = buffer.map { f =>
+      f.getChildren.get(1).evaluate(null).getString
+    }
+    val meta = findMeta(value(0).getBytes)
+    meta.map { f=>
+      new Blocklet(f._1, f._2+"")
+    }.asJava
+  }
+
+
+  private def findMeta(value: Array[Byte]) = {
+    val tuples = maxMin.filter { f =>
+      ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._3._1) <= 0 &&
+      ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._3._2) >= 0
+    }
+    tuples
+  }
+
+  private def getEqualToExpression(expression: Expression, buffer: ArrayBuffer[Expression]): Unit = {
+    if (expression.isInstanceOf[EqualToExpression]) {
+      buffer += expression
+    } else {
+      if (expression.getChildren != null) {
+        expression.getChildren.asScala.map { f =>
+          if (f.isInstanceOf[EqualToExpression]) {
+            buffer += f
+          }
+          getEqualToExpression(f, buffer)
+        }
+      }
+    }
+  }
+
+  /**
+   * Clear complete index table and release memory.
+   */
+  override def clear() = {
+    ???
+  }
+
+  override def isScanRequired(filterExp: FilterResolverIntf): Boolean = ???
+}
+
+class CGDataMapWriter(identifier: AbsoluteTableIdentifier,
+    segmentId: String,
+    dataWritePath: String,
+    dataMapSchema: DataMapSchema)
+  extends AbstractDataMapWriter(identifier, segmentId, dataWritePath) {
+
+  var currentBlockId: String = null
+  val cgwritepath = dataWritePath + "/" +
+                    dataMapSchema.getDataMapName + System.nanoTime() + ".datamap"
+  lazy val stream: DataOutputStream = FileFactory
+    .getDataOutputStream(cgwritepath, FileFactory.getFileType(cgwritepath))
+  val blockletList = new ArrayBuffer[Array[Byte]]()
+  val maxMin = new ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]))]()
+  val compressor = new SnappyCompressor
+
+  /**
+   * Start of new block notification.
+   *
+   * @param blockId file name of the carbondata file
+   */
+  override def onBlockStart(blockId: String): Unit = {
+    currentBlockId = blockId
+  }
+
+  /**
+   * End of block notification
+   */
+  override def onBlockEnd(blockId: String): Unit = {
+
+  }
+
+  /**
+   * Start of new blocklet notification.
+   *
+   * @param blockletId sequence number of blocklet in the block
+   */
+  override def onBlockletStart(blockletId: Int): Unit = {
+
+  }
+
+  /**
+   * End of blocklet notification
+   *
+   * @param blockletId sequence number of blocklet in the block
+   */
+  override def onBlockletEnd(blockletId: Int): Unit = {
+    val sorted = blockletList
+      .sortWith((l, r) => ByteUtil.UnsafeComparer.INSTANCE.compareTo(l, r) <= 0)
+    maxMin +=
+    ((currentBlockId+"", blockletId, (sorted.last, sorted.head)))
+    blockletList.clear()
+  }
+
+  /**
+   * Add the column pages row to the datamap, order of pages is same as `indexColumns` in
+   * DataMapMeta returned in IndexDataMapFactory.
+   *
+   * Implementation should copy the content of `pages` as needed, because `pages` memory
+   * may be freed after this method returns, if using unsafe column page.
+   */
+  override def onPageAdded(blockletId: Int,
+      pageId: Int,
+      pages: Array[ColumnPage]): Unit = {
+    val size = pages(0).getPageSize
+    val list = new ArrayBuffer[Array[Byte]]()
+    var i = 0
+    while (i < size) {
+      val bytes = pages(0).getBytes(i)
+      val newBytes = new Array[Byte](bytes.length - 2)
+      System.arraycopy(bytes, 2, newBytes, 0, newBytes.length)
+      list += newBytes
+      i = i + 1
+    }
+    // Sort based on the column data in order to create index.
+    val sorted = list
+      .sortWith((l, r) => ByteUtil.UnsafeComparer.INSTANCE.compareTo(l, r) <= 0)
+    blockletList += sorted.head
+    blockletList += sorted.last
+  }
+
+
+  /**
+   * This is called during closing of writer.So after this call no more data will be sent to this
+   * class.
+   */
+  override def finish(): Unit = {
+    val out = new ByteOutputStream()
+    val outStream = new ObjectOutputStream(out)
+    outStream.writeObject(maxMin)
+    outStream.close()
+    val bytes = compressor.compressByte(out.getBytes)
+    stream.write(bytes)
+    stream.writeInt(bytes.length)
+    stream.close()
+    commitFile(cgwritepath)
+  }
+
+
+}
+
+class CGIndexDataMapTestCase extends QueryTest with BeforeAndAfterAll {
+
+  val file2 = resourcesPath + "/compaction/fil2.csv"
+  override protected def beforeAll(): Unit = {
+    //n should be about 5000000 of reset if size is default 1024
+    val n = 150000
+    CompactionSupportGlobalSortBigFileTest.createFile(file2, n * 4, n)
+    sql("DROP TABLE IF EXISTS normal_test")
+    sql(
+      """
+        | CREATE TABLE normal_test(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+      """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE normal_test OPTIONS('header'='false')")
+  }
+
+  test("test cg datamap") {
+    sql("DROP TABLE IF EXISTS datamap_test_cg")
+    sql(
+      """
+        | CREATE TABLE datamap_test_cg(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+      """.stripMargin)
+    val table = CarbonMetadata.getInstance().getCarbonTable("default_datamap_test_cg")
+    // register datamap writer
+    sql(s"create datamap cgdatamap on table datamap_test_cg using '${classOf[CGIndexDataMapFactory].getName}' DMPROPERTIES('indexcolumns'='name')")
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test_cg OPTIONS('header'='false')")
+    checkAnswer(sql("select * from datamap_test_cg where name='n502670'"),
+      sql("select * from normal_test where name='n502670'"))
+  }
+
+  test("test cg datamap with 2 datamaps ") {
+    sql("DROP TABLE IF EXISTS datamap_test")
+    sql(
+      """
+        | CREATE TABLE datamap_test(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+      """.stripMargin)
+    val table = CarbonMetadata.getInstance().getCarbonTable("default_datamap_test")
+    // register datamap writer
+    sql(s"create datamap ggdatamap1 on table datamap_test using '${classOf[CGIndexDataMapFactory].getName}' DMPROPERTIES('indexcolumns'='name')")
+    sql(s"create datamap ggdatamap2 on table datamap_test using '${classOf[CGIndexDataMapFactory].getName}' DMPROPERTIES('indexcolumns'='city')")
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')")
+    checkAnswer(sql("select * from datamap_test where name='n502670' and city='c2670'"),
+      sql("select * from normal_test where name='n502670' and city='c2670'"))
+  }
+
+  override protected def afterAll(): Unit = {
+    CompactionSupportGlobalSortBigFileTest.deleteFile(file2)
+    sql("DROP TABLE IF EXISTS normal_test")
+    sql("DROP TABLE IF EXISTS datamap_test_cg")
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
deleted file mode 100644
index 903610a..0000000
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.testsuite.datamap
-
-import java.util
-
-import scala.collection.JavaConverters._
-import org.apache.spark.sql.test.util.QueryTest
-import org.apache.spark.sql.{DataFrame, SaveMode}
-import org.scalatest.BeforeAndAfterAll
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datamap.dev.AbstractDataMapWriter
-import org.apache.carbondata.core.datamap.dev.cgdatamap.{AbstractCoarseGrainDataMap, AbstractCoarseGrainDataMapFactory}
-import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager}
-import org.apache.carbondata.core.datastore.page.ColumnPage
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
-import org.apache.carbondata.core.metadata.datatype.DataTypes
-import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
-import org.apache.carbondata.core.scan.filter.intf.ExpressionType
-import org.apache.carbondata.core.metadata.datatype.DataTypes
-import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.events.Event
-
-class C2DataMapFactory() extends AbstractCoarseGrainDataMapFactory {
-
-  var identifier: AbsoluteTableIdentifier = _
-
-  override def init(identifier: AbsoluteTableIdentifier,
-      dataMapSchema: DataMapSchema): Unit = {
-    this.identifier = identifier
-  }
-
-  override def fireEvent(event: Event): Unit = ???
-
-  override def clear(segmentId: String): Unit = {}
-
-  override def clear(): Unit = {}
-
-  override def getDataMaps(distributable: DataMapDistributable): java.util.List[AbstractCoarseGrainDataMap] = ???
-
-  override def getDataMaps(segmentId: String): util.List[AbstractCoarseGrainDataMap] = ???
-
-  override def createWriter(segmentId: String, dataWritePath: String): AbstractDataMapWriter =
-    DataMapWriterSuite.dataMapWriterC2Mock(identifier, segmentId, dataWritePath)
-
-  override def getMeta: DataMapMeta = new DataMapMeta(List("c2").asJava, List(ExpressionType.EQUALS).asJava)
-
-  /**
-   * Get all distributable objects of a segmentid
-   *
-   * @return
-   */
-  override def toDistributable(segmentId: String): util.List[DataMapDistributable] = {
-    ???
-  }
-
-}
-
-class DataMapWriterSuite extends QueryTest with BeforeAndAfterAll {
-  def buildTestData(numRows: Int): DataFrame = {
-    import sqlContext.implicits._
-    sqlContext.sparkContext.parallelize(1 to numRows, 1)
-      .map(x => ("a" + x, "b", x))
-      .toDF("c1", "c2", "c3")
-  }
-
-  def dropTable(): Unit = {
-    sql("DROP TABLE IF EXISTS carbon1")
-    sql("DROP TABLE IF EXISTS carbon2")
-  }
-
-  override def beforeAll {
-    dropTable()
-  }
-
-  test("test write datamap 2 pages") {
-    sql(s"CREATE TABLE carbon1(c1 STRING, c2 STRING, c3 INT) STORED BY 'org.apache.carbondata.format'")
-    // register datamap writer
-    sql(s"CREATE DATAMAP test ON TABLE carbon1 USING '${classOf[C2DataMapFactory].getName}'")
-    val df = buildTestData(33000)
-
-    // save dataframe to carbon file
-    df.write
-      .format("carbondata")
-      .option("tableName", "carbon1")
-      .option("tempCSV", "false")
-      .option("sort_columns","c1")
-      .mode(SaveMode.Overwrite)
-      .save()
-
-    assert(DataMapWriterSuite.callbackSeq.head.contains("block start"))
-    assert(DataMapWriterSuite.callbackSeq.last.contains("block end"))
-    assert(
-      DataMapWriterSuite.callbackSeq.slice(1, DataMapWriterSuite.callbackSeq.length - 1) == Seq(
-        "blocklet start 0",
-        "add page data: blocklet 0, page 0",
-        "add page data: blocklet 0, page 1",
-        "blocklet end: 0"
-      ))
-    DataMapWriterSuite.callbackSeq = Seq()
-  }
-
-  test("test write datamap 2 blocklet") {
-    sql(s"CREATE TABLE carbon2(c1 STRING, c2 STRING, c3 INT) STORED BY 'org.apache.carbondata.format'")
-    sql(s"CREATE DATAMAP test ON TABLE carbon2 USING '${classOf[C2DataMapFactory].getName}'")
-
-    CarbonProperties.getInstance()
-      .addProperty("carbon.blockletgroup.size.in.mb", "1")
-    CarbonProperties.getInstance()
-      .addProperty("carbon.number.of.cores.while.loading",
-        CarbonCommonConstants.NUM_CORES_DEFAULT_VAL)
-
-    val df = buildTestData(300000)
-
-    // save dataframe to carbon file
-    df.write
-      .format("carbondata")
-      .option("tableName", "carbon2")
-      .option("tempCSV", "false")
-      .option("sort_columns","c1")
-      .option("SORT_SCOPE","GLOBAL_SORT")
-      .mode(SaveMode.Overwrite)
-      .save()
-
-    assert(DataMapWriterSuite.callbackSeq.head.contains("block start"))
-    assert(DataMapWriterSuite.callbackSeq.last.contains("block end"))
-    // corrected test case the min "carbon.blockletgroup.size.in.mb" size could not be less than
-    // 64 MB
-    assert(
-      DataMapWriterSuite.callbackSeq.slice(1, DataMapWriterSuite.callbackSeq.length - 1) == Seq(
-        "blocklet start 0",
-        "add page data: blocklet 0, page 0",
-        "add page data: blocklet 0, page 1",
-        "add page data: blocklet 0, page 2",
-        "add page data: blocklet 0, page 3",
-        "add page data: blocklet 0, page 4",
-        "add page data: blocklet 0, page 5",
-        "add page data: blocklet 0, page 6",
-        "add page data: blocklet 0, page 7",
-        "add page data: blocklet 0, page 8",
-        "add page data: blocklet 0, page 9",
-        "blocklet end: 0"
-      ))
-    DataMapWriterSuite.callbackSeq = Seq()
-  }
-
-  override def afterAll {
-    dropTable()
-  }
-}
-
-object DataMapWriterSuite {
-
-  var callbackSeq: Seq[String] = Seq[String]()
-
-  def dataMapWriterC2Mock(identifier: AbsoluteTableIdentifier, segmentId: String,
-      dataWritePath: String) =
-    new AbstractDataMapWriter(identifier, segmentId, dataWritePath) {
-
-    override def onPageAdded(
-        blockletId: Int,
-        pageId: Int,
-        pages: Array[ColumnPage]): Unit = {
-      assert(pages.length == 1)
-      assert(pages(0).getDataType == DataTypes.STRING)
-      val bytes: Array[Byte] = pages(0).getByteArrayPage()(0)
-      assert(bytes.sameElements(Seq(0, 1, 'b'.toByte)))
-      callbackSeq :+= s"add page data: blocklet $blockletId, page $pageId"
-    }
-
-    override def onBlockletEnd(blockletId: Int): Unit = {
-      callbackSeq :+= s"blocklet end: $blockletId"
-    }
-
-    override def onBlockEnd(blockId: String): Unit = {
-      callbackSeq :+= s"block end $blockId"
-    }
-
-    override def onBlockletStart(blockletId: Int): Unit = {
-      callbackSeq :+= s"blocklet start $blockletId"
-    }
-
-    /**
-     * Start of new block notification.
-     *
-     * @param blockId file name of the carbondata file
-     */
-    override def onBlockStart(blockId: String) = {
-      callbackSeq :+= s"block start $blockId"
-    }
-
-    /**
-     * This is called during closing of writer.So after this call no more data will be sent to this
-     * class.
-     */
-    override def finish() = {
-
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
deleted file mode 100644
index 8031dc2..0000000
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
+++ /dev/null
@@ -1,476 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.spark.testsuite.datamap
-
-import java.io.{ByteArrayInputStream, DataOutputStream, ObjectInputStream, ObjectOutputStream}
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
-
-import com.sun.xml.internal.messaging.saaj.util.ByteOutputStream
-import org.apache.spark.sql.test.util.QueryTest
-import org.scalatest.BeforeAndAfterAll
-
-import org.apache.carbondata.core.datamap.dev.fgdatamap.{AbstractFineGrainDataMap, AbstractFineGrainDataMapFactory}
-import org.apache.carbondata.core.datamap.dev.{AbstractDataMapWriter, DataMapModel}
-import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager}
-import org.apache.carbondata.core.datastore.FileReader
-import org.apache.carbondata.core.datastore.block.SegmentProperties
-import org.apache.carbondata.core.datastore.compression.SnappyCompressor
-import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
-import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.datastore.page.ColumnPage
-import org.apache.carbondata.core.indexstore.FineGrainBlocklet
-import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable
-import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
-import org.apache.carbondata.core.scan.expression.Expression
-import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression
-import org.apache.carbondata.core.scan.filter.intf.ExpressionType
-import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf
-import org.apache.carbondata.core.util.ByteUtil
-import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.events.Event
-import org.apache.carbondata.spark.testsuite.datacompaction.CompactionSupportGlobalSortBigFileTest
-
-class FGDataMapFactory extends AbstractFineGrainDataMapFactory {
-  var identifier: AbsoluteTableIdentifier = _
-  var dataMapSchema: DataMapSchema = _
-
-  /**
-   * Initialization of Datamap factory with the identifier and datamap name
-   */
-  override def init(identifier: AbsoluteTableIdentifier, dataMapSchema: DataMapSchema): Unit = {
-    this.identifier = identifier
-    this.dataMapSchema = dataMapSchema
-  }
-
-  /**
-   * Return a new write for this datamap
-   */
-  override def createWriter(segmentId: String, dataWritePath: String): AbstractDataMapWriter = {
-    new FGDataMapWriter(identifier, segmentId, dataWritePath, dataMapSchema)
-  }
-
-  /**
-   * Get the datamap for segmentid
-   */
-  override def getDataMaps(segmentId: String): java.util.List[AbstractFineGrainDataMap] = {
-    val file = FileFactory
-      .getCarbonFile(CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId))
-
-    val files = file.listFiles(new CarbonFileFilter {
-      override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap")
-    })
-    files.map { f =>
-      val dataMap: AbstractFineGrainDataMap = new FGDataMap()
-      dataMap.init(new DataMapModel(f.getCanonicalPath))
-      dataMap
-    }.toList.asJava
-  }
-
-  /**
-   * Get datamap for distributable object.
-   */
-  override def getDataMaps(
-      distributable: DataMapDistributable): java.util.List[AbstractFineGrainDataMap]= {
-    val mapDistributable = distributable.asInstanceOf[BlockletDataMapDistributable]
-    val dataMap: AbstractFineGrainDataMap = new FGDataMap()
-    dataMap.init(new DataMapModel(mapDistributable.getFilePath))
-    Seq(dataMap).asJava
-  }
-
-  /**
-   * Get all distributable objects of a segmentid
-   *
-   * @return
-   */
-  override def toDistributable(segmentId: String): java.util.List[DataMapDistributable] = {
-    val file = FileFactory
-      .getCarbonFile(CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId))
-
-    val files = file.listFiles(new CarbonFileFilter {
-      override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap")
-    })
-    files.map { f =>
-      val d: DataMapDistributable = new BlockletDataMapDistributable(f.getCanonicalPath)
-      d
-    }.toList.asJava
-  }
-
-
-  /**
-   *
-   * @param event
-   */
-  override def fireEvent(event: Event):Unit = {
-    ???
-  }
-
-  /**
-   * Clears datamap of the segment
-   */
-  override def clear(segmentId: String): Unit = {
-  }
-
-  /**
-   * Clear all datamaps from memory
-   */
-  override def clear(): Unit = {
-  }
-
-  /**
-   * Return metadata of this datamap
-   */
-  override def getMeta: DataMapMeta = {
-    new DataMapMeta(dataMapSchema.getProperties.get("indexcolumns").split(",").toList.asJava,
-      List(ExpressionType.EQUALS, ExpressionType.IN).asJava)
-  }
-}
-
-class FGDataMap extends AbstractFineGrainDataMap {
-
-  var maxMin: ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]), Long, Int)] = _
-  var FileReader: FileReader = _
-  var filePath: String = _
-  val compressor = new SnappyCompressor
-
-  /**
-   * It is called to load the data map to memory or to initialize it.
-   */
-  override def init(dataMapModel: DataMapModel): Unit = {
-    this.filePath = dataMapModel.getFilePath
-    val size = FileFactory.getCarbonFile(filePath).getSize
-    FileReader = FileFactory.getFileHolder(FileFactory.getFileType(filePath))
-    val footerLen = FileReader.readInt(filePath, size - 4)
-    val bytes = FileReader.readByteArray(filePath, size - footerLen - 4, footerLen)
-    val in = new ByteArrayInputStream(compressor.unCompressByte(bytes))
-    val obj = new ObjectInputStream(in)
-    maxMin = obj.readObject()
-      .asInstanceOf[ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]), Long, Int)]]
-  }
-
-  /**
-   * Prune the datamap with filter expression. It returns the list of
-   * blocklets where these filters can exist.
-   *
-   * @param filterExp
-   * @return
-   */
-  override def prune(
-      filterExp: FilterResolverIntf,
-      segmentProperties: SegmentProperties,
-      partitions: java.util.List[String]): java.util.List[FineGrainBlocklet] = {
-    val buffer: ArrayBuffer[Expression] = new ArrayBuffer[Expression]()
-    val expression = filterExp.getFilterExpression
-    getEqualToExpression(expression, buffer)
-    val value = buffer.map { f =>
-      f.getChildren.get(1).evaluate(null).getString
-    }
-    val meta = findMeta(value(0).getBytes)
-    meta.map { f =>
-      readAndFindData(f, value(0).getBytes())
-    }.filter(_.isDefined).map(_.get).asJava
-  }
-
-  private def readAndFindData(meta: (String, Int, (Array[Byte], Array[Byte]), Long, Int),
-      value: Array[Byte]): Option[FineGrainBlocklet] = {
-    val bytes = FileReader.readByteArray(filePath, meta._4, meta._5)
-    val outputStream = new ByteArrayInputStream(compressor.unCompressByte(bytes))
-    val obj = new ObjectInputStream(outputStream)
-    val blockletsData = obj.readObject()
-      .asInstanceOf[ArrayBuffer[(Array[Byte], Seq[Seq[Int]], Seq[Int])]]
-
-    import scala.collection.Searching._
-    val searching = blockletsData
-      .search[(Array[Byte], Seq[Seq[Int]], Seq[Int])]((value, Seq(Seq(0)), Seq(0)))(new Ordering[
-      (Array[Byte], Seq[Seq[Int]], Seq[Int])] {
-      override def compare(x: (Array[Byte], Seq[Seq[Int]], Seq[Int]),
-          y: (Array[Byte], Seq[Seq[Int]], Seq[Int])) = {
-        ByteUtil.UnsafeComparer.INSTANCE.compareTo(x._1, y._1)
-      }
-    })
-    if (searching.insertionPoint >= 0) {
-      val f = blockletsData(searching.insertionPoint)
-      val pages = f._3.zipWithIndex.map { p =>
-        val pg = new FineGrainBlocklet.Page
-        pg.setPageId(p._1)
-        pg.setRowId(f._2(p._2).toArray)
-        pg
-      }
-      pages
-      Some(new FineGrainBlocklet(meta._1, meta._2.toString, pages.toList.asJava))
-    } else {
-      None
-    }
-
-  }
-
-  private def findMeta(value: Array[Byte]) = {
-    val tuples = maxMin.filter { f =>
-      ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._3._1) >= 0 &&
-      ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._3._2) <= 0
-    }
-    tuples
-  }
-
-  def getEqualToExpression(expression: Expression, buffer: ArrayBuffer[Expression]): Unit = {
-    if (expression.isInstanceOf[EqualToExpression]) {
-      buffer += expression
-    } else {
-      if (expression.getChildren != null) {
-        expression.getChildren.asScala.map { f =>
-          if (f.isInstanceOf[EqualToExpression]) {
-            buffer += f
-          }
-          getEqualToExpression(f, buffer)
-        }
-      }
-    }
-  }
-
-  /**
-   * Clear complete index table and release memory.
-   */
-  override def clear():Unit = {
-    ???
-  }
-
-  override def isScanRequired(filterExp: FilterResolverIntf): Boolean = ???
-}
-
-class FGDataMapWriter(identifier: AbsoluteTableIdentifier,
-    segmentId: String, dataWriterPath: String, dataMapSchema: DataMapSchema)
-  extends AbstractDataMapWriter(identifier, segmentId, dataWriterPath) {
-
-  var currentBlockId: String = null
-  val fgwritepath = dataWriterPath + "/" + dataMapSchema.getDataMapName + System.nanoTime() +
-                    ".datamap"
-  val stream: DataOutputStream = FileFactory
-    .getDataOutputStream(fgwritepath, FileFactory.getFileType(fgwritepath))
-  val blockletList = new ArrayBuffer[(Array[Byte], Seq[Int], Seq[Int])]()
-  val maxMin = new ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]), Long, Int)]()
-  var position: Long = 0
-  val compressor = new SnappyCompressor
-
-  /**
-   * Start of new block notification.
-   *
-   * @param blockId file name of the carbondata file
-   */
-  override def onBlockStart(blockId: String): Unit = {
-    currentBlockId = blockId
-  }
-
-  /**
-   * End of block notification
-   */
-  override def onBlockEnd(blockId: String): Unit = {
-
-  }
-
-  /**
-   * Start of new blocklet notification.
-   *
-   * @param blockletId sequence number of blocklet in the block
-   */
-  override def onBlockletStart(blockletId: Int): Unit = {
-
-  }
-
-  /**
-   * End of blocklet notification
-   *
-   * @param blockletId sequence number of blocklet in the block
-   */
-  override def onBlockletEnd(blockletId: Int): Unit = {
-    val sorted = blockletList
-      .sortWith((l, r) => ByteUtil.UnsafeComparer.INSTANCE.compareTo(l._1, r._1) <= 0)
-    var oldValue: (Array[Byte], Seq[Seq[Int]], Seq[Int]) = null
-    var addedLast: Boolean = false
-    val blockletListUpdated = new ArrayBuffer[(Array[Byte], Seq[Seq[Int]], Seq[Int])]()
-    // Merge all same column values to single row.
-    sorted.foreach { f =>
-      if (oldValue != null) {
-        if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(f._1, oldValue._1) == 0) {
-          oldValue = (oldValue._1, oldValue._2 ++ Seq(f._2), oldValue._3 ++ f._3)
-          addedLast = false
-        } else {
-          blockletListUpdated += oldValue
-          oldValue = (f._1, Seq(f._2), f._3)
-          addedLast = true
-        }
-      } else {
-        oldValue = (f._1, Seq(f._2), f._3)
-        addedLast = false
-      }
-    }
-    if (!addedLast && oldValue != null) {
-      blockletListUpdated += oldValue
-    }
-
-    val out = new ByteOutputStream()
-    val outStream = new ObjectOutputStream(out)
-    outStream.writeObject(blockletListUpdated)
-    outStream.close()
-    val bytes = compressor.compressByte(out.getBytes)
-    stream.write(bytes)
-    maxMin +=
-    ((currentBlockId + "", blockletId, (blockletListUpdated.head._1, blockletListUpdated.last
-      ._1), position, bytes.length))
-    position += bytes.length
-    blockletList.clear()
-  }
-
-  /**
-   * Add the column pages row to the datamap, order of pages is same as `indexColumns` in
-   * DataMapMeta returned in DataMapFactory.
-   *
-   * Implementation should copy the content of `pages` as needed, because `pages` memory
-   * may be freed after this method returns, if using unsafe column page.
-   */
-  override def onPageAdded(blockletId: Int,
-      pageId: Int,
-      pages: Array[ColumnPage]): Unit = {
-    val size = pages(0).getPageSize
-    val list = new ArrayBuffer[(Array[Byte], Int)]()
-    var i = 0
-    while (i < size) {
-      val bytes = pages(0).getBytes(i)
-      val newBytes = new Array[Byte](bytes.length - 2)
-      System.arraycopy(bytes, 2, newBytes, 0, newBytes.length)
-      list += ((newBytes, i))
-      i = i + 1
-    }
-    // Sort based on the column data in order to create index.
-    val sorted = list
-      .sortWith((l, r) => ByteUtil.UnsafeComparer.INSTANCE.compareTo(l._1, r._1) <= 0)
-    var oldValue: (Array[Byte], Seq[Int], Seq[Int]) = null
-    var addedLast: Boolean = false
-    // Merge all same column values to single row.
-    sorted.foreach { f =>
-      if (oldValue != null) {
-        if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(f._1, oldValue._1) == 0) {
-          oldValue = (oldValue._1, oldValue._2 ++ Seq(f._2), oldValue._3)
-          addedLast = false
-        } else {
-          blockletList += oldValue
-          oldValue = (f._1, Seq(f._2), Seq(pageId))
-          addedLast = true
-        }
-      } else {
-        oldValue = (f._1, Seq(f._2), Seq(pageId))
-        addedLast = false
-      }
-    }
-    if (!addedLast && oldValue != null) {
-      blockletList += oldValue
-    }
-  }
-
-
-  /**
-   * This is called during closing of writer.So after this call no more data will be sent to this
-   * class.
-   */
-  override def finish(): Unit = {
-    val out = new ByteOutputStream()
-    val outStream = new ObjectOutputStream(out)
-    outStream.writeObject(maxMin)
-    outStream.close()
-    val bytes = compressor.compressByte(out.getBytes)
-    stream.write(bytes)
-    stream.writeInt(bytes.length)
-    stream.close()
-    commitFile(fgwritepath)
-  }
-
-
-}
-
-class FGDataMapTestCase extends QueryTest with BeforeAndAfterAll {
-
-  val file2 = resourcesPath + "/compaction/fil2.csv"
-
-  override protected def beforeAll(): Unit = {
-    //n should be about 5000000 of reset if size is default 1024
-    val n = 150000
-    CompactionSupportGlobalSortBigFileTest.createFile(file2, n * 4, n)
-    sql("DROP TABLE IF EXISTS normal_test")
-    sql(
-      """
-        | CREATE TABLE normal_test(id INT, name STRING, city STRING, age INT)
-        | STORED BY 'org.apache.carbondata.format'
-        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
-      """.stripMargin)
-    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE normal_test OPTIONS('header'='false')")
-  }
-
-  test("test fg datamap") {
-    sql("DROP TABLE IF EXISTS datamap_test")
-    sql(
-      """
-        | CREATE TABLE datamap_test(id INT, name STRING, city STRING, age INT)
-        | STORED BY 'org.apache.carbondata.format'
-        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
-      """.stripMargin)
-    val table = CarbonMetadata.getInstance().getCarbonTable("default_datamap_test")
-    // register datamap writer
-    sql(
-      s"""
-         | CREATE DATAMAP ggdatamap ON TABLE datamap_test
-         | USING '${classOf[FGDataMapFactory].getName}'
-         | DMPROPERTIES('indexcolumns'='name')
-       """.stripMargin)
-    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')")
-    checkAnswer(sql("select * from datamap_test where name='n502670'"),
-      sql("select * from normal_test where name='n502670'"))
-  }
-
-  test("test fg datamap with 2 datamaps ") {
-    sql("DROP TABLE IF EXISTS datamap_test")
-    sql(
-      """
-        | CREATE TABLE datamap_test(id INT, name STRING, city STRING, age INT)
-        | STORED BY 'org.apache.carbondata.format'
-        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
-      """.stripMargin)
-    val table = CarbonMetadata.getInstance().getCarbonTable("default_datamap_test")
-    // register datamap writer
-    sql(
-      s"""
-         | CREATE DATAMAP ggdatamap1 ON TABLE datamap_test
-         | USING '${classOf[FGDataMapFactory].getName}'
-         | DMPROPERTIES('indexcolumns'='name')
-       """.stripMargin)
-    sql(
-      s"""
-         | CREATE DATAMAP ggdatamap2 ON TABLE datamap_test
-         | USING '${classOf[FGDataMapFactory].getName}'
-         | DMPROPERTIES('indexcolumns'='city')
-       """.stripMargin)
-    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')")
-    checkAnswer(sql("select * from datamap_test where name='n502670' and city='c2670'"),
-      sql("select * from normal_test where name='n502670' and city='c2670'"))
-  }
-
-  override protected def afterAll(): Unit = {
-    CompactionSupportGlobalSortBigFileTest.deleteFile(file2)
-    sql("DROP TABLE IF EXISTS normal_test")
-    sql("DROP TABLE IF EXISTS datamap_test")
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGIndexDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGIndexDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGIndexDataMapTestCase.scala
new file mode 100644
index 0000000..8ddad75
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGIndexDataMapTestCase.scala
@@ -0,0 +1,474 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.testsuite.datamap
+
+import java.io.{ByteArrayInputStream, DataOutputStream, ObjectInputStream, ObjectOutputStream}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+import com.sun.xml.internal.messaging.saaj.util.ByteOutputStream
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.datamap.dev.fgdatamap.{AbstractFineGrainIndexDataMap, AbstractFineGrainIndexDataMapFactory}
+import org.apache.carbondata.core.datamap.dev.{AbstractDataMapWriter, DataMapModel}
+import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta}
+import org.apache.carbondata.core.datastore.FileReader
+import org.apache.carbondata.core.datastore.block.SegmentProperties
+import org.apache.carbondata.core.datastore.compression.SnappyCompressor
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.datastore.page.ColumnPage
+import org.apache.carbondata.core.indexstore.FineGrainBlocklet
+import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
+import org.apache.carbondata.core.scan.expression.Expression
+import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression
+import org.apache.carbondata.core.scan.filter.intf.ExpressionType
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf
+import org.apache.carbondata.core.util.ByteUtil
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.events.Event
+import org.apache.carbondata.spark.testsuite.datacompaction.CompactionSupportGlobalSortBigFileTest
+
+class FGIndexDataMapFactory extends AbstractFineGrainIndexDataMapFactory {
+  var identifier: AbsoluteTableIdentifier = _
+  var dataMapSchema: DataMapSchema = _
+
+  /**
+   * Initialization of Datamap factory with the identifier and datamap name
+   */
+  override def init(identifier: AbsoluteTableIdentifier, dataMapSchema: DataMapSchema): Unit = {
+    this.identifier = identifier
+    this.dataMapSchema = dataMapSchema
+  }
+
+  /**
+   * Return a new write for this datamap
+   */
+  override def createWriter(segmentId: String, dataWritePath: String): AbstractDataMapWriter = {
+    new FGDataMapWriter(identifier, segmentId, dataWritePath, dataMapSchema)
+  }
+
+  /**
+   * Get the datamap for segmentid
+   */
+  override def getDataMaps(segmentId: String): java.util.List[AbstractFineGrainIndexDataMap] = {
+    val file = FileFactory
+      .getCarbonFile(CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId))
+
+    val files = file.listFiles(new CarbonFileFilter {
+      override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap")
+    })
+    files.map { f =>
+      val dataMap: AbstractFineGrainIndexDataMap = new FGIndexDataMap()
+      dataMap.init(new DataMapModel(f.getCanonicalPath))
+      dataMap
+    }.toList.asJava
+  }
+
+  /**
+   * Get datamap for distributable object.
+   */
+  override def getDataMaps(
+      distributable: DataMapDistributable): java.util.List[AbstractFineGrainIndexDataMap]= {
+    val mapDistributable = distributable.asInstanceOf[BlockletDataMapDistributable]
+    val dataMap: AbstractFineGrainIndexDataMap = new FGIndexDataMap()
+    dataMap.init(new DataMapModel(mapDistributable.getFilePath))
+    Seq(dataMap).asJava
+  }
+
+  /**
+   * Get all distributable objects of a segmentid
+   *
+   * @return
+   */
+  override def toDistributable(segmentId: String): java.util.List[DataMapDistributable] = {
+    val file = FileFactory
+      .getCarbonFile(CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId))
+
+    val files = file.listFiles(new CarbonFileFilter {
+      override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap")
+    })
+    files.map { f =>
+      val d: DataMapDistributable = new BlockletDataMapDistributable(f.getCanonicalPath)
+      d
+    }.toList.asJava
+  }
+
+
+  /**
+   *
+   * @param event
+   */
+  override def fireEvent(event: Event):Unit = {
+    ???
+  }
+
+  /**
+   * Clears datamap of the segment
+   */
+  override def clear(segmentId: String): Unit = {
+  }
+
+  /**
+   * Clear all datamaps from memory
+   */
+  override def clear(): Unit = {
+  }
+
+  /**
+   * Return metadata of this datamap
+   */
+  override def getMeta: DataMapMeta = {
+    new DataMapMeta(dataMapSchema.getProperties.get("indexcolumns").split(",").toList.asJava,
+      List(ExpressionType.EQUALS, ExpressionType.IN).asJava)
+  }
+}
+
+class FGIndexDataMap extends AbstractFineGrainIndexDataMap {
+
+  var maxMin: ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]), Long, Int)] = _
+  var FileReader: FileReader = _
+  var filePath: String = _
+  val compressor = new SnappyCompressor
+
+  /**
+   * It is called to load the data map to memory or to initialize it.
+   */
+  override def init(dataMapModel: DataMapModel): Unit = {
+    this.filePath = dataMapModel.getFilePath
+    val size = FileFactory.getCarbonFile(filePath).getSize
+    FileReader = FileFactory.getFileHolder(FileFactory.getFileType(filePath))
+    val footerLen = FileReader.readInt(filePath, size - 4)
+    val bytes = FileReader.readByteArray(filePath, size - footerLen - 4, footerLen)
+    val in = new ByteArrayInputStream(compressor.unCompressByte(bytes))
+    val obj = new ObjectInputStream(in)
+    maxMin = obj.readObject()
+      .asInstanceOf[ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]), Long, Int)]]
+  }
+
+  /**
+   * Prune the datamap with filter expression. It returns the list of
+   * blocklets where these filters can exist.
+   *
+   * @param filterExp
+   * @return
+   */
+  override def prune(
+      filterExp: FilterResolverIntf,
+      segmentProperties: SegmentProperties,
+      partitions: java.util.List[String]): java.util.List[FineGrainBlocklet] = {
+    val buffer: ArrayBuffer[Expression] = new ArrayBuffer[Expression]()
+    val expression = filterExp.getFilterExpression
+    getEqualToExpression(expression, buffer)
+    val value = buffer.map { f =>
+      f.getChildren.get(1).evaluate(null).getString
+    }
+    val meta = findMeta(value(0).getBytes)
+    meta.map { f =>
+      readAndFindData(f, value(0).getBytes())
+    }.filter(_.isDefined).map(_.get).asJava
+  }
+
+  private def readAndFindData(meta: (String, Int, (Array[Byte], Array[Byte]), Long, Int),
+      value: Array[Byte]): Option[FineGrainBlocklet] = {
+    val bytes = FileReader.readByteArray(filePath, meta._4, meta._5)
+    val outputStream = new ByteArrayInputStream(compressor.unCompressByte(bytes))
+    val obj = new ObjectInputStream(outputStream)
+    val blockletsData = obj.readObject()
+      .asInstanceOf[ArrayBuffer[(Array[Byte], Seq[Seq[Int]], Seq[Int])]]
+
+    import scala.collection.Searching._
+    val searching = blockletsData
+      .search[(Array[Byte], Seq[Seq[Int]], Seq[Int])]((value, Seq(Seq(0)), Seq(0)))(new Ordering[
+      (Array[Byte], Seq[Seq[Int]], Seq[Int])] {
+      override def compare(x: (Array[Byte], Seq[Seq[Int]], Seq[Int]),
+          y: (Array[Byte], Seq[Seq[Int]], Seq[Int])) = {
+        ByteUtil.UnsafeComparer.INSTANCE.compareTo(x._1, y._1)
+      }
+    })
+    if (searching.insertionPoint >= 0) {
+      val f = blockletsData(searching.insertionPoint)
+      val pages = f._3.zipWithIndex.map { p =>
+        val pg = new FineGrainBlocklet.Page
+        pg.setPageId(p._1)
+        pg.setRowId(f._2(p._2).toArray)
+        pg
+      }
+      pages
+      Some(new FineGrainBlocklet(meta._1, meta._2.toString, pages.toList.asJava))
+    } else {
+      None
+    }
+
+  }
+
+  private def findMeta(value: Array[Byte]) = {
+    val tuples = maxMin.filter { f =>
+      ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._3._1) >= 0 &&
+      ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._3._2) <= 0
+    }
+    tuples
+  }
+
+  def getEqualToExpression(expression: Expression, buffer: ArrayBuffer[Expression]): Unit = {
+    if (expression.isInstanceOf[EqualToExpression]) {
+      buffer += expression
+    } else {
+      if (expression.getChildren != null) {
+        expression.getChildren.asScala.map { f =>
+          if (f.isInstanceOf[EqualToExpression]) {
+            buffer += f
+          }
+          getEqualToExpression(f, buffer)
+        }
+      }
+    }
+  }
+
+  /**
+   * Clear complete index table and release memory.
+   */
+  override def clear():Unit = {
+    ???
+  }
+
+  override def isScanRequired(filterExp: FilterResolverIntf): Boolean = ???
+}
+
+class FGDataMapWriter(identifier: AbsoluteTableIdentifier,
+    segmentId: String, dataWriterPath: String, dataMapSchema: DataMapSchema)
+  extends AbstractDataMapWriter(identifier, segmentId, dataWriterPath) {
+
+  var currentBlockId: String = null
+  val fgwritepath = dataWriterPath + "/" + dataMapSchema.getDataMapName + System.nanoTime() +
+                    ".datamap"
+  val stream: DataOutputStream = FileFactory
+    .getDataOutputStream(fgwritepath, FileFactory.getFileType(fgwritepath))
+  val blockletList = new ArrayBuffer[(Array[Byte], Seq[Int], Seq[Int])]()
+  val maxMin = new ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]), Long, Int)]()
+  var position: Long = 0
+  val compressor = new SnappyCompressor
+
+  /**
+   * Start of new block notification.
+   *
+   * @param blockId file name of the carbondata file
+   */
+  override def onBlockStart(blockId: String): Unit = {
+    currentBlockId = blockId
+  }
+
+  /**
+   * End of block notification
+   */
+  override def onBlockEnd(blockId: String): Unit = {
+
+  }
+
+  /**
+   * Start of new blocklet notification.
+   *
+   * @param blockletId sequence number of blocklet in the block
+   */
+  override def onBlockletStart(blockletId: Int): Unit = {
+
+  }
+
+  /**
+   * End of blocklet notification
+   *
+   * @param blockletId sequence number of blocklet in the block
+   */
+  override def onBlockletEnd(blockletId: Int): Unit = {
+    val sorted = blockletList
+      .sortWith((l, r) => ByteUtil.UnsafeComparer.INSTANCE.compareTo(l._1, r._1) <= 0)
+    var oldValue: (Array[Byte], Seq[Seq[Int]], Seq[Int]) = null
+    var addedLast: Boolean = false
+    val blockletListUpdated = new ArrayBuffer[(Array[Byte], Seq[Seq[Int]], Seq[Int])]()
+    // Merge all same column values to single row.
+    sorted.foreach { f =>
+      if (oldValue != null) {
+        if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(f._1, oldValue._1) == 0) {
+          oldValue = (oldValue._1, oldValue._2 ++ Seq(f._2), oldValue._3 ++ f._3)
+          addedLast = false
+        } else {
+          blockletListUpdated += oldValue
+          oldValue = (f._1, Seq(f._2), f._3)
+          addedLast = true
+        }
+      } else {
+        oldValue = (f._1, Seq(f._2), f._3)
+        addedLast = false
+      }
+    }
+    if (!addedLast && oldValue != null) {
+      blockletListUpdated += oldValue
+    }
+
+    val out = new ByteOutputStream()
+    val outStream = new ObjectOutputStream(out)
+    outStream.writeObject(blockletListUpdated)
+    outStream.close()
+    val bytes = compressor.compressByte(out.getBytes)
+    stream.write(bytes)
+    maxMin +=
+    ((currentBlockId + "", blockletId, (blockletListUpdated.head._1, blockletListUpdated.last
+      ._1), position, bytes.length))
+    position += bytes.length
+    blockletList.clear()
+  }
+
+  /**
+   * Add the column pages row to the datamap, order of pages is same as `indexColumns` in
+   * DataMapMeta returned in IndexDataMapFactory.
+   *
+   * Implementation should copy the content of `pages` as needed, because `pages` memory
+   * may be freed after this method returns, if using unsafe column page.
+   */
+  override def onPageAdded(blockletId: Int,
+      pageId: Int,
+      pages: Array[ColumnPage]): Unit = {
+    val size = pages(0).getPageSize
+    val list = new ArrayBuffer[(Array[Byte], Int)]()
+    var i = 0
+    while (i < size) {
+      val bytes = pages(0).getBytes(i)
+      val newBytes = new Array[Byte](bytes.length - 2)
+      System.arraycopy(bytes, 2, newBytes, 0, newBytes.length)
+      list += ((newBytes, i))
+      i = i + 1
+    }
+    // Sort based on the column data in order to create index.
+    val sorted = list
+      .sortWith((l, r) => ByteUtil.UnsafeComparer.INSTANCE.compareTo(l._1, r._1) <= 0)
+    var oldValue: (Array[Byte], Seq[Int], Seq[Int]) = null
+    var addedLast: Boolean = false
+    // Merge all same column values to single row.
+    sorted.foreach { f =>
+      if (oldValue != null) {
+        if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(f._1, oldValue._1) == 0) {
+          oldValue = (oldValue._1, oldValue._2 ++ Seq(f._2), oldValue._3)
+          addedLast = false
+        } else {
+          blockletList += oldValue
+          oldValue = (f._1, Seq(f._2), Seq(pageId))
+          addedLast = true
+        }
+      } else {
+        oldValue = (f._1, Seq(f._2), Seq(pageId))
+        addedLast = false
+      }
+    }
+    if (!addedLast && oldValue != null) {
+      blockletList += oldValue
+    }
+  }
+
+
+  /**
+   * This is called during closing of writer.So after this call no more data will be sent to this
+   * class.
+   */
+  override def finish(): Unit = {
+    val out = new ByteOutputStream()
+    val outStream = new ObjectOutputStream(out)
+    outStream.writeObject(maxMin)
+    outStream.close()
+    val bytes = compressor.compressByte(out.getBytes)
+    stream.write(bytes)
+    stream.writeInt(bytes.length)
+    stream.close()
+    commitFile(fgwritepath)
+  }
+}
+
+class FGIndexDataMapTestCase extends QueryTest with BeforeAndAfterAll {
+
+  val file2 = resourcesPath + "/compaction/fil2.csv"
+
+  override protected def beforeAll(): Unit = {
+    //n should be about 5000000 of reset if size is default 1024
+    val n = 150000
+    CompactionSupportGlobalSortBigFileTest.createFile(file2, n * 4, n)
+    sql("DROP TABLE IF EXISTS normal_test")
+    sql(
+      """
+        | CREATE TABLE normal_test(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+      """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE normal_test OPTIONS('header'='false')")
+  }
+
+  test("test fg datamap") {
+    sql("DROP TABLE IF EXISTS datamap_test")
+    sql(
+      """
+        | CREATE TABLE datamap_test(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+      """.stripMargin)
+    val table = CarbonMetadata.getInstance().getCarbonTable("default_datamap_test")
+    // register datamap writer
+    sql(
+      s"""
+         | CREATE DATAMAP ggdatamap ON TABLE datamap_test
+         | USING '${classOf[FGIndexDataMapFactory].getName}'
+         | DMPROPERTIES('indexcolumns'='name')
+       """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')")
+    checkAnswer(sql("select * from datamap_test where name='n502670'"),
+      sql("select * from normal_test where name='n502670'"))
+  }
+
+  test("test fg datamap with 2 datamaps ") {
+    sql("DROP TABLE IF EXISTS datamap_test")
+    sql(
+      """
+        | CREATE TABLE datamap_test(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+      """.stripMargin)
+    val table = CarbonMetadata.getInstance().getCarbonTable("default_datamap_test")
+    // register datamap writer
+    sql(
+      s"""
+         | CREATE DATAMAP ggdatamap1 ON TABLE datamap_test
+         | USING '${classOf[FGIndexDataMapFactory].getName}'
+         | DMPROPERTIES('indexcolumns'='name')
+       """.stripMargin)
+    sql(
+      s"""
+         | CREATE DATAMAP ggdatamap2 ON TABLE datamap_test
+         | USING '${classOf[FGIndexDataMapFactory].getName}'
+         | DMPROPERTIES('indexcolumns'='city')
+       """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')")
+    checkAnswer(sql("select * from datamap_test where name='n502670' and city='c2670'"),
+      sql("select * from normal_test where name='n502670' and city='c2670'"))
+  }
+
+  override protected def afterAll(): Unit = {
+    CompactionSupportGlobalSortBigFileTest.deleteFile(file2)
+    sql("DROP TABLE IF EXISTS normal_test")
+    sql("DROP TABLE IF EXISTS datamap_test")
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/IndexDataMapWriterSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/IndexDataMapWriterSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/IndexDataMapWriterSuite.scala
new file mode 100644
index 0000000..5fd8ae9
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/IndexDataMapWriterSuite.scala
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.datamap
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.test.util.QueryTest
+import org.apache.spark.sql.{DataFrame, SaveMode}
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datamap.dev.AbstractDataMapWriter
+import org.apache.carbondata.core.datamap.dev.cgdatamap.{AbstractCoarseGrainIndexDataMap, AbstractCoarseGrainIndexDataMapFactory}
+import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta}
+import org.apache.carbondata.core.datastore.page.ColumnPage
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
+import org.apache.carbondata.core.scan.filter.intf.ExpressionType
+import org.apache.carbondata.core.metadata.datatype.DataTypes
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.events.Event
+
+class C2IndexDataMapFactory() extends AbstractCoarseGrainIndexDataMapFactory {
+
+  var identifier: AbsoluteTableIdentifier = _
+
+  override def init(identifier: AbsoluteTableIdentifier,
+      dataMapSchema: DataMapSchema): Unit = {
+    this.identifier = identifier
+  }
+
+  override def fireEvent(event: Event): Unit = ???
+
+  override def clear(segmentId: String): Unit = {}
+
+  override def clear(): Unit = {}
+
+  override def getDataMaps(distributable: DataMapDistributable): java.util.List[AbstractCoarseGrainIndexDataMap] = ???
+
+  override def getDataMaps(segmentId: String): util.List[AbstractCoarseGrainIndexDataMap] = ???
+
+  override def createWriter(segmentId: String, dataWritePath: String): AbstractDataMapWriter =
+    IndexDataMapWriterSuite.dataMapWriterC2Mock(identifier, segmentId, dataWritePath)
+
+  override def getMeta: DataMapMeta = new DataMapMeta(List("c2").asJava, List(ExpressionType.EQUALS).asJava)
+
+  /**
+   * Get all distributable objects of a segmentid
+   *
+   * @return
+   */
+  override def toDistributable(segmentId: String): util.List[DataMapDistributable] = {
+    ???
+  }
+
+}
+
+class IndexDataMapWriterSuite extends QueryTest with BeforeAndAfterAll {
+  def buildTestData(numRows: Int): DataFrame = {
+    import sqlContext.implicits._
+    sqlContext.sparkContext.parallelize(1 to numRows, 1)
+      .map(x => ("a" + x, "b", x))
+      .toDF("c1", "c2", "c3")
+  }
+
+  def dropTable(): Unit = {
+    sql("DROP TABLE IF EXISTS carbon1")
+    sql("DROP TABLE IF EXISTS carbon2")
+  }
+
+  override def beforeAll {
+    dropTable()
+  }
+
+  test("test write datamap 2 pages") {
+    sql(s"CREATE TABLE carbon1(c1 STRING, c2 STRING, c3 INT) STORED BY 'org.apache.carbondata.format'")
+    // register datamap writer
+    sql(s"CREATE DATAMAP test ON TABLE carbon1 USING '${classOf[C2IndexDataMapFactory].getName}'")
+    val df = buildTestData(33000)
+
+    // save dataframe to carbon file
+    df.write
+      .format("carbondata")
+      .option("tableName", "carbon1")
+      .option("tempCSV", "false")
+      .option("sort_columns","c1")
+      .mode(SaveMode.Overwrite)
+      .save()
+
+    assert(IndexDataMapWriterSuite.callbackSeq.head.contains("block start"))
+    assert(IndexDataMapWriterSuite.callbackSeq.last.contains("block end"))
+    assert(
+      IndexDataMapWriterSuite.callbackSeq.slice(1, IndexDataMapWriterSuite.callbackSeq.length - 1) == Seq(
+        "blocklet start 0",
+        "add page data: blocklet 0, page 0",
+        "add page data: blocklet 0, page 1",
+        "blocklet end: 0"
+      ))
+    IndexDataMapWriterSuite.callbackSeq = Seq()
+  }
+
+  test("test write datamap 2 blocklet") {
+    sql(s"CREATE TABLE carbon2(c1 STRING, c2 STRING, c3 INT) STORED BY 'org.apache.carbondata.format'")
+    sql(s"CREATE DATAMAP test ON TABLE carbon2 USING '${classOf[C2IndexDataMapFactory].getName}'")
+
+    CarbonProperties.getInstance()
+      .addProperty("carbon.blockletgroup.size.in.mb", "1")
+    CarbonProperties.getInstance()
+      .addProperty("carbon.number.of.cores.while.loading",
+        CarbonCommonConstants.NUM_CORES_DEFAULT_VAL)
+
+    val df = buildTestData(300000)
+
+    // save dataframe to carbon file
+    df.write
+      .format("carbondata")
+      .option("tableName", "carbon2")
+      .option("tempCSV", "false")
+      .option("sort_columns","c1")
+      .option("SORT_SCOPE","GLOBAL_SORT")
+      .mode(SaveMode.Overwrite)
+      .save()
+
+    assert(IndexDataMapWriterSuite.callbackSeq.head.contains("block start"))
+    assert(IndexDataMapWriterSuite.callbackSeq.last.contains("block end"))
+    // corrected test case the min "carbon.blockletgroup.size.in.mb" size could not be less than
+    // 64 MB
+    assert(
+      IndexDataMapWriterSuite.callbackSeq.slice(1, IndexDataMapWriterSuite.callbackSeq.length - 1) == Seq(
+        "blocklet start 0",
+        "add page data: blocklet 0, page 0",
+        "add page data: blocklet 0, page 1",
+        "add page data: blocklet 0, page 2",
+        "add page data: blocklet 0, page 3",
+        "add page data: blocklet 0, page 4",
+        "add page data: blocklet 0, page 5",
+        "add page data: blocklet 0, page 6",
+        "add page data: blocklet 0, page 7",
+        "add page data: blocklet 0, page 8",
+        "add page data: blocklet 0, page 9",
+        "blocklet end: 0"
+      ))
+    IndexDataMapWriterSuite.callbackSeq = Seq()
+  }
+
+  override def afterAll {
+    dropTable()
+  }
+}
+
+object IndexDataMapWriterSuite {
+
+  var callbackSeq: Seq[String] = Seq[String]()
+
+  def dataMapWriterC2Mock(identifier: AbsoluteTableIdentifier, segmentId: String,
+      dataWritePath: String) =
+    new AbstractDataMapWriter(identifier, segmentId, dataWritePath) {
+
+    override def onPageAdded(
+        blockletId: Int,
+        pageId: Int,
+        pages: Array[ColumnPage]): Unit = {
+      assert(pages.length == 1)
+      assert(pages(0).getDataType == DataTypes.STRING)
+      val bytes: Array[Byte] = pages(0).getByteArrayPage()(0)
+      assert(bytes.sameElements(Seq(0, 1, 'b'.toByte)))
+      callbackSeq :+= s"add page data: blocklet $blockletId, page $pageId"
+    }
+
+    override def onBlockletEnd(blockletId: Int): Unit = {
+      callbackSeq :+= s"blocklet end: $blockletId"
+    }
+
+    override def onBlockEnd(blockId: String): Unit = {
+      callbackSeq :+= s"block end $blockId"
+    }
+
+    override def onBlockletStart(blockletId: Int): Unit = {
+      callbackSeq :+= s"blocklet start $blockletId"
+    }
+
+    /**
+     * Start of new block notification.
+     *
+     * @param blockId file name of the carbondata file
+     */
+    override def onBlockStart(blockId: String) = {
+      callbackSeq :+= s"block start $blockId"
+    }
+
+    /**
+     * This is called during closing of writer.So after this call no more data will be sent to this
+     * class.
+     */
+    override def finish() = {
+
+    }
+  }
+}
\ No newline at end of file