You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ma...@apache.org on 2018/05/17 14:30:22 UTC
[09/50] [abbrv] carbondata git commit: [CARBONDATA-2416] Support
DEFERRED REBUILD when creating DataMap
http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala
index 67effda..cccfb3f 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala
@@ -17,6 +17,7 @@
package org.apache.carbondata.spark.testsuite.datamap
+import java.io.File
import java.util
import scala.collection.JavaConverters._
@@ -26,14 +27,12 @@ import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException
import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datamap.dev.{DataMapRefresher, DataMapWriter}
import org.apache.carbondata.core.datamap.dev.cgdatamap.{CoarseGrainDataMap, CoarseGrainDataMapFactory}
+import org.apache.carbondata.core.datamap.dev.{DataMapBuilder, DataMapWriter}
import org.apache.carbondata.core.datamap.status.{DataMapStatus, DataMapStatusManager}
import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, Segment}
import org.apache.carbondata.core.datastore.page.ColumnPage
import org.apache.carbondata.core.features.TableOperation
-import org.apache.carbondata.core.datastore.row.CarbonRow
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema}
import org.apache.carbondata.core.scan.filter.intf.ExpressionType
import org.apache.carbondata.core.util.CarbonProperties
@@ -44,10 +43,11 @@ class TestDataMapStatus extends QueryTest with BeforeAndAfterAll {
val testData = s"$resourcesPath/sample.csv"
override def beforeAll: Unit = {
+ new File(CarbonProperties.getInstance().getSystemFolderLocation).delete()
drop
}
- test("datamap status disable for new datamap") {
+ test("datamap status enable for new datamap") {
sql("DROP TABLE IF EXISTS datamapstatustest")
sql(
"""
@@ -64,11 +64,33 @@ class TestDataMapStatus extends QueryTest with BeforeAndAfterAll {
assert(details.length == 1)
+ assert(details.exists(p => p.getDataMapName.equals("statusdatamap") && p.getStatus == DataMapStatus.ENABLED))
+ sql("DROP TABLE IF EXISTS datamapstatustest")
+ }
+
+ test("datamap status disable for new datamap with deferred rebuild") {
+ sql("DROP TABLE IF EXISTS datamapstatustest")
+ sql(
+ """
+ | CREATE TABLE datamapstatustest(id int, name string, city string, age int)
+ | STORED BY 'org.apache.carbondata.format'
+ """.stripMargin)
+ sql(
+ s"""create datamap statusdatamap on table datamapstatustest
+ |using '${classOf[TestDataMapFactory].getName}'
+ |with deferred rebuild
+ |dmproperties('index_columns'='name')
+ | """.stripMargin)
+
+ val details = DataMapStatusManager.readDataMapStatusDetails()
+
+ assert(details.length == 1)
+
assert(details.exists(p => p.getDataMapName.equals("statusdatamap") && p.getStatus == DataMapStatus.DISABLED))
sql("DROP TABLE IF EXISTS datamapstatustest")
}
- test("datamap status disable after new load") {
+ test("datamap status disable after new load with deferred rebuild") {
sql("DROP TABLE IF EXISTS datamapstatustest1")
sql(
"""
@@ -78,6 +100,7 @@ class TestDataMapStatus extends QueryTest with BeforeAndAfterAll {
sql(
s"""create datamap statusdatamap1 on table datamapstatustest1
|using '${classOf[TestDataMapFactory].getName}'
+ |with deferred rebuild
|dmproperties('index_columns'='name')
| """.stripMargin)
@@ -94,8 +117,7 @@ class TestDataMapStatus extends QueryTest with BeforeAndAfterAll {
sql("DROP TABLE IF EXISTS datamapstatustest1")
}
- // enable it in PR2255
- ignore("datamap status with refresh datamap") {
+ test("datamap status with REBUILD DATAMAP") {
sql("DROP TABLE IF EXISTS datamapstatustest2")
sql(
"""
@@ -105,6 +127,7 @@ class TestDataMapStatus extends QueryTest with BeforeAndAfterAll {
sql(
s"""create datamap statusdatamap2 on table datamapstatustest2
|using '${classOf[TestDataMapFactory].getName}'
+ |with deferred rebuild
|dmproperties('index_columns'='name')
| """.stripMargin)
@@ -119,7 +142,7 @@ class TestDataMapStatus extends QueryTest with BeforeAndAfterAll {
assert(details.length == 1)
assert(details.exists(p => p.getDataMapName.equals("statusdatamap2") && p.getStatus == DataMapStatus.DISABLED))
- sql(s"refresh datamap statusdatamap2 on table datamapstatustest2")
+ sql(s"REBUILD DATAMAP statusdatamap2 on table datamapstatustest2")
details = DataMapStatusManager.readDataMapStatusDetails()
assert(details.length == 1)
@@ -128,8 +151,7 @@ class TestDataMapStatus extends QueryTest with BeforeAndAfterAll {
sql("DROP TABLE IF EXISTS datamapstatustest2")
}
- // enable it in PR2255
- ignore("datamap create without on table test") {
+ test("datamap create without on table test") {
sql("DROP TABLE IF EXISTS datamapstatustest3")
sql(
"""
@@ -144,10 +166,20 @@ class TestDataMapStatus extends QueryTest with BeforeAndAfterAll {
| """.stripMargin)
}
+ sql("DROP TABLE IF EXISTS datamapstatustest3")
+ }
+ test("rebuild datamap status") {
+ sql("DROP TABLE IF EXISTS datamapstatustest3")
+ sql(
+ """
+ | CREATE TABLE datamapstatustest3(id int, name string, city string, age int)
+ | STORED BY 'org.apache.carbondata.format'
+ """.stripMargin)
sql(
s"""create datamap statusdatamap3 on table datamapstatustest3
|using '${classOf[TestDataMapFactory].getName}'
+ |with deferred rebuild
|dmproperties('index_columns'='name')
| """.stripMargin)
@@ -162,7 +194,7 @@ class TestDataMapStatus extends QueryTest with BeforeAndAfterAll {
assert(details.length == 1)
assert(details.exists(p => p.getDataMapName.equals("statusdatamap3") && p.getStatus == DataMapStatus.DISABLED))
- sql(s"refresh datamap statusdatamap3")
+ sql(s"REBUILD DATAMAP statusdatamap3")
details = DataMapStatusManager.readDataMapStatusDetails()
assert(details.length == 1)
@@ -245,8 +277,19 @@ class TestDataMapFactory(
false
}
- override def createRefresher(segment: Segment,
- shardName: String): DataMapRefresher = {
- ???
+ override def createBuilder(segment: Segment,
+ shardName: String): DataMapBuilder = {
+ return new DataMapBuilder {
+ override def initialize(): Unit = { }
+
+ override def addRow(blockletId: Int,
+ pageId: Int,
+ rowId: Int,
+ values: Array[AnyRef]): Unit = { }
+
+ override def finish(): Unit = { }
+
+ override def close(): Unit = { }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala
index c193fcf..2f3488e 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala
@@ -28,8 +28,6 @@ import org.apache.carbondata.spark.util.DataGenerator
* Test Suite for search mode
*/
-// TODO: Need to Fix
-@Ignore
class SearchModeTestCase extends QueryTest with BeforeAndAfterAll {
val numRows = 500 * 1000
http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
index 9c5297d..3cabc7b 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
@@ -24,11 +24,12 @@ import java.util.concurrent.{Callable, ExecutorService, Executors, Future}
import scala.collection.JavaConverters._
import org.apache.spark.sql.test.util.QueryTest
+import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, SaveMode}
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datamap.dev.{DataMapRefresher, DataMapWriter}
+import org.apache.carbondata.core.datamap.dev.{DataMapBuilder, DataMapWriter}
import org.apache.carbondata.core.datamap.dev.cgdatamap.{CoarseGrainDataMap, CoarseGrainDataMapFactory}
import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, Segment}
import org.apache.carbondata.core.datastore.page.ColumnPage
@@ -43,18 +44,30 @@ import org.apache.carbondata.events.Event
// This testsuite test insert and insert overwrite with other commands concurrently
class TestInsertAndOtherCommandConcurrent extends QueryTest with BeforeAndAfterAll with BeforeAndAfterEach {
private val executorService: ExecutorService = Executors.newFixedThreadPool(10)
- var df: DataFrame = _
+ var testData: DataFrame = _
override def beforeAll {
dropTable()
buildTestData()
+ createTable("orders", testData.schema)
+ createTable("orders_overwrite", testData.schema)
sql(
s"""
| create datamap test on table orders
| using '${classOf[WaitingDataMapFactory].getName}'
| dmproperties('index_columns'='o_name')
""".stripMargin)
+
+ testData.write
+ .format("carbondata")
+ .option("tableName", "temp_table")
+ .option("tempCSV", "false")
+ .mode(SaveMode.Overwrite)
+ .save()
+
+ sql(s"insert into orders select * from temp_table")
+ sql(s"insert into orders_overwrite select * from temp_table")
}
private def buildTestData(): Unit = {
@@ -66,23 +79,17 @@ class TestInsertAndOtherCommandConcurrent extends QueryTest with BeforeAndAfterA
import sqlContext.implicits._
val sdf = new SimpleDateFormat("yyyy-MM-dd")
- df = sqlContext.sparkSession.sparkContext.parallelize(1 to 150000)
+ testData = sqlContext.sparkSession.sparkContext.parallelize(1 to 150000)
.map(value => (value, new java.sql.Date(sdf.parse("2015-07-" + (value % 10 + 10)).getTime),
"china", "aaa" + value, "phone" + 555 * value, "ASD" + (60000 + value), 14999 + value,
"ordersTable" + value))
.toDF("o_id", "o_date", "o_country", "o_name",
"o_phonetype", "o_serialname", "o_salary", "o_comment")
- createTable("orders")
- createTable("orders_overwrite")
}
- private def createTable(tableName: String): Unit = {
- df.write
- .format("carbondata")
- .option("tableName", tableName)
- .option("tempCSV", "false")
- .mode(SaveMode.Overwrite)
- .save()
+ private def createTable(tableName: String, schema: StructType): Unit = {
+ val schemaString = schema.fields.map(x => x.name + " " + x.dataType.typeName).mkString(", ")
+ sql(s"CREATE TABLE $tableName ($schemaString) stored as carbondata")
}
override def afterAll {
@@ -91,7 +98,7 @@ class TestInsertAndOtherCommandConcurrent extends QueryTest with BeforeAndAfterA
}
override def beforeEach(): Unit = {
- Global.overwriteRunning = false
+ Global.loading = false
}
private def dropTable() = {
@@ -101,12 +108,12 @@ class TestInsertAndOtherCommandConcurrent extends QueryTest with BeforeAndAfterA
// run the input SQL and block until it is running
private def runSqlAsync(sql: String): Future[String] = {
- assert(!Global.overwriteRunning)
+ assert(!Global.loading)
var count = 0
val future = executorService.submit(
new QueryTask(sql)
)
- while (!Global.overwriteRunning && count < 1000) {
+ while (!Global.loading && count < 1000) {
Thread.sleep(10)
// to avoid dead loop in case WaitingDataMapFactory is not invoked
count += 1
@@ -202,9 +209,7 @@ class TestInsertAndOtherCommandConcurrent extends QueryTest with BeforeAndAfterA
sql("drop table if exists t1")
// number of segment is 1 after createTable
- createTable("t1")
- // number of segment is 2 after insert
- sql("insert into table t1 select * from orders_overwrite")
+ createTable("t1", testData.schema)
sql(
s"""
@@ -212,6 +217,12 @@ class TestInsertAndOtherCommandConcurrent extends QueryTest with BeforeAndAfterA
| using '${classOf[WaitingDataMapFactory].getName}'
| dmproperties('index_columns'='o_name')
""".stripMargin)
+
+ sql("insert into table t1 select * from orders_overwrite")
+ Thread.sleep(1100)
+ sql("insert into table t1 select * from orders_overwrite")
+ Thread.sleep(1100)
+
val future = runSqlAsync("insert into table t1 select * from orders_overwrite")
sql("alter table t1 compact 'MAJOR'")
assert(future.get.contains("PASS"))
@@ -279,15 +290,13 @@ class TestInsertAndOtherCommandConcurrent extends QueryTest with BeforeAndAfterA
}
object Global {
- var overwriteRunning = false
+ var loading = false
}
class WaitingDataMapFactory(
carbonTable: CarbonTable,
dataMapSchema: DataMapSchema) extends CoarseGrainDataMapFactory(carbonTable, dataMapSchema) {
- private var identifier: AbsoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
-
override def fireEvent(event: Event): Unit = ???
override def clear(segmentId: Segment): Unit = {}
@@ -311,14 +320,14 @@ class WaitingDataMapFactory(
override def onBlockStart(blockId: String): Unit = {
// trigger the second SQL to execute
- Global.overwriteRunning = true
+ Global.loading = true
// wait for 1 second to let second SQL to finish
Thread.sleep(1000)
}
override def finish(): Unit = {
-
+ Global.loading = false
}
}
}
@@ -341,8 +350,8 @@ class WaitingDataMapFactory(
false
}
- override def createRefresher(segment: Segment,
- shardName: String): DataMapRefresher = {
+ override def createBuilder(segment: Segment,
+ shardName: String): DataMapBuilder = {
???
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index fea3482..890f8fc 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -181,7 +181,8 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
protected val ON = carbonKeyWord("ON")
protected val DMPROPERTIES = carbonKeyWord("DMPROPERTIES")
protected val SELECT = carbonKeyWord("SELECT")
- protected val REFRESH = carbonKeyWord("REFRESH")
+ protected val REBUILD = carbonKeyWord("REBUILD")
+ protected val DEFERRED = carbonKeyWord("DEFERRED")
protected val doubleQuotedString = "\"([^\"]+)\"".r
protected val singleQuotedString = "'([^']+)'".r
http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapProperty.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapProperty.java b/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapProperty.java
deleted file mode 100644
index 043acb1..0000000
--- a/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapProperty.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.datamap;
-
-import org.apache.carbondata.common.annotations.InterfaceAudience;
-
-/**
- * Property that can be specified when creating DataMap
- */
-@InterfaceAudience.Internal
-public class DataMapProperty {
-
- /**
- * Used to specify the store location of the datamap
- */
- public static final String PARTITIONING = "partitioning";
- public static final String PATH = "path";
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java b/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java
index a5124a0..0642e01 100644
--- a/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java
+++ b/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java
@@ -105,7 +105,7 @@ public class IndexDataMapProvider extends DataMapProvider {
@Override
public void rebuild() {
- IndexDataMapRefreshRDD.rebuildDataMap(sparkSession, getMainTable(), getDataMapSchema());
+ IndexDataMapRebuildRDD.rebuildDataMap(sparkSession, getMainTable(), getDataMapSchema());
}
@Override
http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/integration/spark2/src/main/java/org/apache/carbondata/datamap/PreAggregateDataMapProvider.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/datamap/PreAggregateDataMapProvider.java b/integration/spark2/src/main/java/org/apache/carbondata/datamap/PreAggregateDataMapProvider.java
index 746a361..37d49e5 100644
--- a/integration/spark2/src/main/java/org/apache/carbondata/datamap/PreAggregateDataMapProvider.java
+++ b/integration/spark2/src/main/java/org/apache/carbondata/datamap/PreAggregateDataMapProvider.java
@@ -17,11 +17,15 @@
package org.apache.carbondata.datamap;
+import java.util.HashMap;
+import java.util.Map;
+
import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException;
import org.apache.carbondata.core.datamap.DataMapCatalog;
import org.apache.carbondata.core.datamap.DataMapProvider;
import org.apache.carbondata.core.datamap.dev.DataMapFactory;
+import org.apache.carbondata.core.metadata.schema.datamap.DataMapProperty;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
@@ -55,9 +59,11 @@ public class PreAggregateDataMapProvider extends DataMapProvider {
private void validateDmProperty(DataMapSchema dataMapSchema)
throws MalformedDataMapCommandException {
if (!dataMapSchema.getProperties().isEmpty()) {
- if (dataMapSchema.getProperties().size() > 2 || (
- !dataMapSchema.getProperties().containsKey(DataMapProperty.PATH) &&
- !dataMapSchema.getProperties().containsKey(DataMapProperty.PARTITIONING))) {
+ Map<String, String> properties = new HashMap<>(dataMapSchema.getProperties());
+ properties.remove(DataMapProperty.DEFERRED_REBUILD);
+ properties.remove(DataMapProperty.PATH);
+ properties.remove(DataMapProperty.PARTITIONING);
+ if (properties.size() > 0) {
throw new MalformedDataMapCommandException(
"Only 'path' and 'partitioning' dmproperties are allowed for this datamap");
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
new file mode 100644
index 0000000..5902783
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.datamap
+
+import java.io.{File, IOException}
+import java.text.SimpleDateFormat
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.mapred.JobConf
+import org.apache.hadoop.mapreduce.{Job, TaskAttemptID, TaskType}
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.spark.{CarbonInputMetrics, Partition, TaskContext}
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.sql.SparkSession
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment}
+import org.apache.carbondata.core.datamap.{DataMapRegistry, DataMapStoreManager, Segment}
+import org.apache.carbondata.core.datamap.dev.DataMapBuilder
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes}
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema, TableInfo}
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.util.TaskMetricsMap
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit, CarbonProjection, CarbonRecordReader}
+import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat}
+import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport
+import org.apache.carbondata.spark.{RefreshResult, RefreshResultImpl}
+import org.apache.carbondata.spark.rdd.{CarbonRDDWithTableInfo, CarbonSparkPartition}
+import org.apache.carbondata.spark.util.SparkDataTypeConverterImpl
+
+/**
+ * Helper object to rebuild the index DataMap
+ */
+object IndexDataMapRebuildRDD {
+
+ /**
+ * Rebuild the datamap for all existing data in the table
+ */
+ def rebuildDataMap(
+ sparkSession: SparkSession,
+ carbonTable: CarbonTable,
+ schema: DataMapSchema
+ ): Unit = {
+ val tableIdentifier = carbonTable.getAbsoluteTableIdentifier
+ val segmentStatusManager = new SegmentStatusManager(tableIdentifier)
+ val validAndInvalidSegments = segmentStatusManager.getValidAndInvalidSegments()
+ val validSegments = validAndInvalidSegments.getValidSegments
+ val indexedCarbonColumns = carbonTable.getIndexedColumns(schema)
+
+ // loop all segments to rebuild DataMap
+ validSegments.asScala.foreach { segment =>
+ // if lucene datamap folder is exists, not require to build lucene datamap again
+ refreshOneSegment(sparkSession, carbonTable, schema.getDataMapName,
+ indexedCarbonColumns, segment.getSegmentNo);
+ }
+ }
+
+ private def refreshOneSegment(
+ sparkSession: SparkSession,
+ carbonTable: CarbonTable,
+ dataMapName: String,
+ indexColumns: java.util.List[CarbonColumn],
+ segmentId: String): Unit = {
+
+ val dataMapStorePath =
+ CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId) +
+ File.separator +
+ dataMapName
+
+ if (!FileFactory.isFileExist(dataMapStorePath)) {
+ if (FileFactory.mkdirs(dataMapStorePath, FileFactory.getFileType(dataMapStorePath))) {
+ try {
+ val status = new IndexDataMapRebuildRDD[String, Boolean](
+ sparkSession,
+ new RefreshResultImpl(),
+ carbonTable.getTableInfo,
+ dataMapName,
+ indexColumns.asScala.toArray,
+ segmentId
+ ).collect()
+
+ status.find(_._2 == false).foreach { task =>
+ throw new Exception(
+ s"Task Failed to rebuild datamap $dataMapName on segment_$segmentId")
+ }
+ } catch {
+ case ex: Throwable =>
+ // process failure
+ FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(dataMapStorePath))
+ throw new Exception(
+ s"Failed to refresh datamap $dataMapName on segment_$segmentId", ex)
+ }
+ } else {
+ throw new IOException(s"Failed to create directory $dataMapStorePath")
+ }
+ }
+ }
+
+}
+
+class OriginalReadSupport(dataTypes: Array[DataType]) extends CarbonReadSupport[Array[Object]] {
+ override def initialize(carbonColumns: Array[CarbonColumn],
+ carbonTable: CarbonTable): Unit = {
+ }
+
+ override def readRow(data: Array[Object]): Array[Object] = {
+ dataTypes.zipWithIndex.foreach { case (dataType, i) =>
+ if (dataType == DataTypes.STRING) {
+ data(i) = data(i).toString
+ }
+ }
+ data
+ }
+
+ override def close(): Unit = {
+ }
+}
+
+class IndexDataMapRebuildRDD[K, V](
+ session: SparkSession,
+ result: RefreshResult[K, V],
+ @transient tableInfo: TableInfo,
+ dataMapName: String,
+ indexColumns: Array[CarbonColumn],
+ segmentId: String
+) extends CarbonRDDWithTableInfo[(K, V)](
+ session.sparkContext, Nil, tableInfo.serialize()) {
+
+ private val dataMapSchema = DataMapStoreManager.getInstance().getDataMapSchema(dataMapName)
+ private val queryId = sparkContext.getConf.get("queryId", System.nanoTime() + "")
+ private val jobTrackerId: String = {
+ val formatter = new SimpleDateFormat("yyyyMMddHHmm")
+ formatter.format(new util.Date())
+ }
+
+ override def internalCompute(split: Partition, context: TaskContext): Iterator[(K, V)] = {
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+ val dataMapFactory =
+ DataMapManager.get().getDataMapProvider(
+ CarbonTable.buildFromTableInfo(getTableInfo), dataMapSchema, session).getDataMapFactory
+ var status = false
+ val inputMetrics = new CarbonInputMetrics
+ TaskMetricsMap.getInstance().registerThreadCallback()
+ val inputSplit = split.asInstanceOf[CarbonSparkPartition].split.value
+ inputMetrics.initBytesReadCallback(context, inputSplit)
+
+ val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0)
+ val attemptContext = new TaskAttemptContextImpl(new Configuration(), attemptId)
+ val format = createInputFormat(attemptContext)
+
+ val model = format.createQueryModel(inputSplit, attemptContext)
+ // one query id per table
+ model.setQueryId(queryId)
+ model.setVectorReader(false)
+ model.setForcedDetailRawQuery(false)
+ model.setRequiredRowId(true)
+
+ var reader: CarbonRecordReader[Array[Object]] = null
+ var refresher: DataMapBuilder = null
+ try {
+ reader = new CarbonRecordReader(
+ model, new OriginalReadSupport(indexColumns.map(_.getDataType)), inputMetrics)
+ reader.initialize(inputSplit, attemptContext)
+
+ // we use task name as shard name to create the folder for this datamap
+ val shardName = CarbonTablePath.getShardName(inputSplit.getAllSplits.get(0).getBlockPath)
+ refresher = dataMapFactory.createBuilder(new Segment(segmentId), shardName)
+ refresher.initialize()
+
+ var blockletId = 0
+ var firstRow = true
+ while (reader.nextKeyValue()) {
+ val rowWithPosition = reader.getCurrentValue
+ val size = rowWithPosition.length
+ val pageId = rowWithPosition(size - 2).asInstanceOf[Int]
+ val rowId = rowWithPosition(size - 1).asInstanceOf[Int]
+
+ if (!firstRow && pageId == 0 && rowId == 0) {
+ // new blocklet started, increase blockletId
+ blockletId = blockletId + 1
+ } else {
+ firstRow = false
+ }
+
+ refresher.addRow(blockletId, pageId, rowId, rowWithPosition)
+ }
+
+ refresher.finish()
+
+ status = true
+ } finally {
+ if (reader != null) {
+ try {
+ reader.close()
+ } catch {
+ case ex: Throwable =>
+ LOGGER.error(ex, "Failed to close reader")
+ }
+ }
+
+ if (refresher != null) {
+ try {
+ refresher.close()
+ } catch {
+ case ex: Throwable =>
+ LOGGER.error(ex, "Failed to close index writer")
+ }
+ }
+ }
+
+ new Iterator[(K, V)] {
+
+ var finished = false
+
+ override def hasNext: Boolean = {
+ !finished
+ }
+
+ override def next(): (K, V) = {
+ finished = true
+ result.getKey(split.index.toString, status)
+ }
+ }
+ }
+
+
+ private def createInputFormat(
+ attemptContext: TaskAttemptContextImpl) = {
+ val format = new CarbonTableInputFormat[Object]
+ val tableInfo1 = getTableInfo
+ val conf = attemptContext.getConfiguration
+ CarbonInputFormat.setTableInfo(conf, tableInfo1)
+ CarbonInputFormat.setDatabaseName(conf, tableInfo1.getDatabaseName)
+ CarbonInputFormat.setTableName(conf, tableInfo1.getFactTable.getTableName)
+ CarbonInputFormat.setDataTypeConverter(conf, classOf[SparkDataTypeConverterImpl])
+
+ val identifier = tableInfo1.getOrCreateAbsoluteTableIdentifier()
+ CarbonInputFormat.setTablePath(
+ conf,
+ identifier.appendWithLocalPrefix(identifier.getTablePath))
+
+ CarbonInputFormat.setSegmentsToAccess(
+ conf,
+ Segment.toSegmentList(Array(segmentId), null))
+
+ CarbonInputFormat.setColumnProjection(
+ conf,
+ new CarbonProjection(indexColumns.map(_.getColName)))
+ format
+ }
+
+ override protected def getPartitions = {
+ if (!dataMapSchema.isIndexDataMap) {
+ throw new UnsupportedOperationException
+ }
+ val conf = new Configuration()
+ val jobConf = new JobConf(conf)
+ SparkHadoopUtil.get.addCredentials(jobConf)
+ val job = Job.getInstance(jobConf)
+ job.getConfiguration.set("query.id", queryId)
+
+ val format = new CarbonTableInputFormat[Object]
+
+ CarbonInputFormat.setSegmentsToAccess(
+ job.getConfiguration,
+ Segment.toSegmentList(Array(segmentId), null))
+
+ CarbonInputFormat.setTableInfo(
+ job.getConfiguration,
+ tableInfo)
+ CarbonInputFormat.setTablePath(
+ job.getConfiguration,
+ tableInfo.getOrCreateAbsoluteTableIdentifier().getTablePath)
+ CarbonInputFormat.setDatabaseName(
+ job.getConfiguration,
+ tableInfo.getDatabaseName)
+ CarbonInputFormat.setTableName(
+ job.getConfiguration,
+ tableInfo.getFactTable.getTableName)
+
+ format
+ .getSplits(job)
+ .asScala
+ .map(_.asInstanceOf[CarbonInputSplit])
+ .groupBy(_.taskId)
+ .map { group =>
+ new CarbonMultiBlockSplit(
+ group._2.asJava,
+ group._2.flatMap(_.getLocations).toArray)
+ }
+ .zipWithIndex
+ .map { split =>
+ new CarbonSparkPartition(id, split._2, split._1)
+ }
+ .toArray
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRefreshRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRefreshRDD.scala b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRefreshRDD.scala
deleted file mode 100644
index c341c36..0000000
--- a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRefreshRDD.scala
+++ /dev/null
@@ -1,317 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.datamap
-
-import java.io.{File, IOException}
-import java.text.SimpleDateFormat
-import java.util
-
-import scala.collection.JavaConverters._
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.mapreduce.{Job, TaskAttemptID, TaskType}
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
-import org.apache.spark.{CarbonInputMetrics, Partition, TaskContext}
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.sql.SparkSession
-
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment}
-import org.apache.carbondata.core.datamap.dev.DataMapRefresher
-import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes}
-import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema, TableInfo}
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn
-import org.apache.carbondata.core.statusmanager.SegmentStatusManager
-import org.apache.carbondata.core.util.TaskMetricsMap
-import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit, CarbonProjection, CarbonRecordReader}
-import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat}
-import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport
-import org.apache.carbondata.spark.{RefreshResult, RefreshResultImpl}
-import org.apache.carbondata.spark.rdd.{CarbonRDDWithTableInfo, CarbonSparkPartition}
-import org.apache.carbondata.spark.util.SparkDataTypeConverterImpl
-
-/**
- * Helper object to rebuild the index DataMap
- */
-object IndexDataMapRefreshRDD {
-
- /**
- * Rebuild the datamap for all existing data in the table
- */
- def rebuildDataMap(
- sparkSession: SparkSession,
- carbonTable: CarbonTable,
- schema: DataMapSchema
- ): Unit = {
- val tableIdentifier = carbonTable.getAbsoluteTableIdentifier
- val segmentStatusManager = new SegmentStatusManager(tableIdentifier)
- val validAndInvalidSegments = segmentStatusManager.getValidAndInvalidSegments()
- val validSegments = validAndInvalidSegments.getValidSegments
- val indexedCarbonColumns = carbonTable.getIndexedColumns(schema)
-
- // loop all segments to rebuild DataMap
- validSegments.asScala.foreach { segment =>
- // if lucene datamap folder is exists, not require to build lucene datamap again
- refreshOneSegment(sparkSession, carbonTable, schema.getDataMapName,
- indexedCarbonColumns, segment.getSegmentNo);
- }
- }
-
- private def refreshOneSegment(
- sparkSession: SparkSession,
- carbonTable: CarbonTable,
- dataMapName: String,
- indexColumns: java.util.List[CarbonColumn],
- segmentId: String): Unit = {
-
- val dataMapStorePath =
- CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId) +
- File.separator +
- dataMapName
-
- if (!FileFactory.isFileExist(dataMapStorePath)) {
- if (FileFactory.mkdirs(dataMapStorePath, FileFactory.getFileType(dataMapStorePath))) {
- try {
- val status = new IndexDataMapRefreshRDD[String, Boolean](
- sparkSession,
- new RefreshResultImpl(),
- carbonTable.getTableInfo,
- dataMapName,
- indexColumns.asScala.toArray,
- segmentId
- ).collect()
-
- status.find(_._2 == false).foreach { task =>
- throw new Exception(
- s"Task Failed to refresh datamap $dataMapName on segment_$segmentId")
- }
- } catch {
- case ex: Throwable =>
- // process failure
- FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(dataMapStorePath))
- throw new Exception(
- s"Failed to refresh datamap $dataMapName on segment_$segmentId", ex)
- }
- } else {
- throw new IOException(s"Failed to create directory $dataMapStorePath")
- }
- }
- }
-
-}
-
-class OriginalReadSupport(dataTypes: Array[DataType]) extends CarbonReadSupport[Array[Object]] {
- override def initialize(carbonColumns: Array[CarbonColumn],
- carbonTable: CarbonTable): Unit = {
- }
-
- override def readRow(data: Array[Object]): Array[Object] = {
- dataTypes.zipWithIndex.foreach { case (dataType, i) =>
- if (dataType == DataTypes.STRING) {
- data(i) = data(i).toString
- }
- }
- data
- }
-
- override def close(): Unit = {
- }
-}
-
-class IndexDataMapRefreshRDD[K, V](
- session: SparkSession,
- result: RefreshResult[K, V],
- @transient tableInfo: TableInfo,
- dataMapName: String,
- indexColumns: Array[CarbonColumn],
- segmentId: String
-) extends CarbonRDDWithTableInfo[(K, V)](
- session.sparkContext, Nil, tableInfo.serialize()) {
-
- private val dataMapSchema = DataMapStoreManager.getInstance().getDataMapSchema(dataMapName)
- private val queryId = sparkContext.getConf.get("queryId", System.nanoTime() + "")
- private val jobTrackerId: String = {
- val formatter = new SimpleDateFormat("yyyyMMddHHmm")
- formatter.format(new util.Date())
- }
-
- override def internalCompute(split: Partition, context: TaskContext): Iterator[(K, V)] = {
- val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
- val dataMapFactory =
- DataMapManager.get().getDataMapProvider(
- CarbonTable.buildFromTableInfo(getTableInfo), dataMapSchema, session).getDataMapFactory
- var status = false
- val inputMetrics = new CarbonInputMetrics
- TaskMetricsMap.getInstance().registerThreadCallback()
- val inputSplit = split.asInstanceOf[CarbonSparkPartition].split.value
- inputMetrics.initBytesReadCallback(context, inputSplit)
-
- val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0)
- val attemptContext = new TaskAttemptContextImpl(new Configuration(), attemptId)
- val format = createInputFormat(attemptContext)
-
- val model = format.createQueryModel(inputSplit, attemptContext)
- // one query id per table
- model.setQueryId(queryId)
- model.setVectorReader(false)
- model.setForcedDetailRawQuery(false)
- model.setRequiredRowId(true)
-
- var reader: CarbonRecordReader[Array[Object]] = null
- var refresher: DataMapRefresher = null
- try {
- reader = new CarbonRecordReader(
- model, new OriginalReadSupport(indexColumns.map(_.getDataType)), inputMetrics)
- reader.initialize(inputSplit, attemptContext)
-
- // we use task name as shard name to create the folder for this datamap
- val shardName = CarbonTablePath.getShardName(inputSplit.getAllSplits.get(0).getBlockPath)
- refresher = dataMapFactory.createRefresher(new Segment(segmentId), shardName)
- refresher.initialize()
-
- var blockletId = 0
- var firstRow = true
- while (reader.nextKeyValue()) {
- val rowWithPosition = reader.getCurrentValue
- val size = rowWithPosition.length
- val pageId = rowWithPosition(size - 2).asInstanceOf[Int]
- val rowId = rowWithPosition(size - 1).asInstanceOf[Int]
-
- if (!firstRow && pageId == 0 && rowId == 0) {
- // new blocklet started, increase blockletId
- blockletId = blockletId + 1
- } else {
- firstRow = false
- }
-
- refresher.addRow(blockletId, pageId, rowId, rowWithPosition)
- }
-
- refresher.finish()
-
- status = true
- } finally {
- if (reader != null) {
- try {
- reader.close()
- } catch {
- case ex: Throwable =>
- LOGGER.error(ex, "Failed to close reader")
- }
- }
-
- if (refresher != null) {
- try {
- refresher.close()
- } catch {
- case ex: Throwable =>
- LOGGER.error(ex, "Failed to close index writer")
- }
- }
- }
-
- new Iterator[(K, V)] {
-
- var finished = false
-
- override def hasNext: Boolean = {
- !finished
- }
-
- override def next(): (K, V) = {
- finished = true
- result.getKey(split.index.toString, status)
- }
- }
- }
-
-
- private def createInputFormat(
- attemptContext: TaskAttemptContextImpl) = {
- val format = new CarbonTableInputFormat[Object]
- val tableInfo1 = getTableInfo
- val conf = attemptContext.getConfiguration
- CarbonInputFormat.setTableInfo(conf, tableInfo1)
- CarbonInputFormat.setDatabaseName(conf, tableInfo1.getDatabaseName)
- CarbonInputFormat.setTableName(conf, tableInfo1.getFactTable.getTableName)
- CarbonInputFormat.setDataTypeConverter(conf, classOf[SparkDataTypeConverterImpl])
-
- val identifier = tableInfo1.getOrCreateAbsoluteTableIdentifier()
- CarbonInputFormat.setTablePath(
- conf,
- identifier.appendWithLocalPrefix(identifier.getTablePath))
-
- CarbonInputFormat.setSegmentsToAccess(
- conf,
- Segment.toSegmentList(Array(segmentId), null))
-
- CarbonInputFormat.setColumnProjection(
- conf,
- new CarbonProjection(indexColumns.map(_.getColName)))
- format
- }
-
- override protected def getPartitions = {
- if (!dataMapSchema.isIndexDataMap) {
- throw new UnsupportedOperationException
- }
- val conf = new Configuration()
- val jobConf = new JobConf(conf)
- SparkHadoopUtil.get.addCredentials(jobConf)
- val job = Job.getInstance(jobConf)
- job.getConfiguration.set("query.id", queryId)
-
- val format = new CarbonTableInputFormat[Object]
-
- CarbonInputFormat.setSegmentsToAccess(
- job.getConfiguration,
- Segment.toSegmentList(Array(segmentId), null))
-
- CarbonInputFormat.setTableInfo(
- job.getConfiguration,
- tableInfo)
- CarbonInputFormat.setTablePath(
- job.getConfiguration,
- tableInfo.getOrCreateAbsoluteTableIdentifier().getTablePath)
- CarbonInputFormat.setDatabaseName(
- job.getConfiguration,
- tableInfo.getDatabaseName)
- CarbonInputFormat.setTableName(
- job.getConfiguration,
- tableInfo.getFactTable.getTableName)
-
- format
- .getSplits(job)
- .asScala
- .map(_.asInstanceOf[CarbonInputSplit])
- .groupBy(_.taskId)
- .map { group =>
- new CarbonMultiBlockSplit(
- group._2.asJava,
- group._2.flatMap(_.getLocations).toArray)
- }
- .zipWithIndex
- .map { split =>
- new CarbonSparkPartition(id, split._2, split._1)
- }
- .toArray
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 42c9c25..bdbaef5 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -848,11 +848,11 @@ object CarbonDataRDDFactory {
val errorMessage = s"Dataload failed due to failure in table status updation for" +
s" ${carbonLoadModel.getTableName}"
LOGGER.audit("Data load is failed for " +
- s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+ s"${carbonLoadModel.getDatabaseName}.${carbonLoadModel.getTableName}")
LOGGER.error("Dataload failed due to failure in table status updation.")
throw new Exception(errorMessage)
} else {
- DataMapStatusManager.disableDataMapsOfTable(carbonTable)
+ DataMapStatusManager.disableAllLazyDataMaps(carbonTable)
}
done
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
index 5fac5a8..497f95a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
@@ -36,6 +36,7 @@ import org.apache.spark.sql.optimizer.CarbonFilters
import org.apache.spark.sql.profiler.{Profiler, SQLStart}
import org.apache.spark.util.{CarbonReflectionUtils, Utils}
+import org.apache.carbondata.common.annotations.InterfaceAudience
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.scan.expression.LiteralExpression
@@ -110,6 +111,16 @@ class CarbonSession(@transient val sc: SparkContext,
)
}
+ /**
+ * Return true if the specified sql statement will hit the datamap
+ * This API is for test purpose only
+ */
+ @InterfaceAudience.Developer(Array("DataMap"))
+ def isDataMapHit(sqlStatement: String, dataMapName: String): Boolean = {
+ val message = sql(s"EXPLAIN $sqlStatement").collect()
+ message(0).getString(0).contains(dataMapName)
+ }
+
def isSearchModeEnabled: Boolean = carbonStore != null
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
index 34a4013..25589d4 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
@@ -16,6 +16,8 @@
*/
package org.apache.spark.sql.execution.command.datamap
+import java.util
+
import scala.collection.JavaConverters._
import scala.collection.mutable
@@ -27,7 +29,7 @@ import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandExcept
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.datamap.{DataMapProvider, DataMapStoreManager}
import org.apache.carbondata.core.datamap.status.DataMapStatusManager
-import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider
+import org.apache.carbondata.core.metadata.schema.datamap.{DataMapClassProvider, DataMapProperty}
import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema}
import org.apache.carbondata.datamap.{DataMapManager, IndexDataMapProvider}
@@ -41,7 +43,8 @@ case class CarbonCreateDataMapCommand(
dmProviderName: String,
dmProperties: Map[String, String],
queryString: Option[String],
- ifNotExistsSet: Boolean = false)
+ ifNotExistsSet: Boolean = false,
+ deferredRebuild: Boolean = false)
extends AtomicRunnableCommand {
private var dataMapProvider: DataMapProvider = _
@@ -78,9 +81,16 @@ case class CarbonCreateDataMapCommand(
}
dataMapSchema = new DataMapSchema(dataMapName, dmProviderName)
- dataMapSchema.setProperties(new java.util.HashMap[String, String](
- dmProperties.map(x => (x._1.trim, x._2.trim)).asJava))
+ val property = dmProperties.map(x => (x._1.trim, x._2.trim)).asJava
+ val javaMap = new java.util.HashMap[String, String](property)
+ javaMap.put(DataMapProperty.DEFERRED_REBUILD, deferredRebuild.toString)
+ dataMapSchema.setProperties(javaMap)
+
+ if (dataMapSchema.isIndexDataMap && mainTable == null) {
+ throw new MalformedDataMapCommandException(
+ "For this datamap, main table is required. Use `CREATE DATAMAP ... ON TABLE ...` ")
+ }
dataMapProvider = DataMapManager.get.getDataMapProvider(mainTable, dataMapSchema, sparkSession)
// If it is index datamap, check whether the column has datamap created already
@@ -101,6 +111,10 @@ case class CarbonCreateDataMapCommand(
dataMapProvider.initMeta(queryString.orNull)
DataMapStatusManager.disableDataMap(dataMapName)
case _ =>
+ if (deferredRebuild) {
+ throw new MalformedDataMapCommandException(
+ "DEFERRED REBUILD is not supported on this DataMap")
+ }
dataMapProvider.initMeta(queryString.orNull)
}
val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
@@ -111,10 +125,11 @@ case class CarbonCreateDataMapCommand(
override def processData(sparkSession: SparkSession): Seq[Row] = {
if (dataMapProvider != null) {
dataMapProvider.initData()
- if (mainTable != null &&
- mainTable.isAutoRefreshDataMap &&
- !dataMapSchema.isIndexDataMap) {
+ if (mainTable != null && !deferredRebuild) {
dataMapProvider.rebuild()
+ if (dataMapSchema.isIndexDataMap) {
+ DataMapStatusManager.enableDataMap(dataMapName)
+ }
}
}
Seq.empty
http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRebuildCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRebuildCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRebuildCommand.scala
new file mode 100644
index 0000000..6493c83
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRebuildCommand.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.datamap
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.command.DataCommand
+
+import org.apache.carbondata.core.datamap.{DataMapRegistry, DataMapStoreManager}
+import org.apache.carbondata.core.datamap.status.DataMapStatusManager
+import org.apache.carbondata.datamap.{DataMapManager, IndexDataMapRebuildRDD}
+
+/**
+ * Rebuild the datamaps through sync with main table data. After sync with parent table's it enables
+ * the datamap.
+ */
+case class CarbonDataMapRebuildCommand(
+ dataMapName: String,
+ tableIdentifier: Option[TableIdentifier]) extends DataCommand {
+
+ override def processData(sparkSession: SparkSession): Seq[Row] = {
+ val schema = DataMapStoreManager.getInstance().getDataMapSchema(dataMapName)
+
+ val table = tableIdentifier match {
+ case Some(identifier) =>
+ CarbonEnv.getCarbonTable(identifier)(sparkSession)
+ case _ =>
+ CarbonEnv.getCarbonTable(
+ Option(schema.getRelationIdentifier.getDatabaseName),
+ schema.getRelationIdentifier.getTableName
+ )(sparkSession)
+ }
+ val provider = DataMapManager.get().getDataMapProvider(table, schema, sparkSession)
+ provider.rebuild()
+
+ // After rebuild successfully enable the datamap.
+ DataMapStatusManager.enableDataMap(dataMapName)
+ Seq.empty
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRefreshCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRefreshCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRefreshCommand.scala
deleted file mode 100644
index 4f3b7bc..0000000
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRefreshCommand.scala
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.command.datamap
-
-import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.execution.command.DataCommand
-
-import org.apache.carbondata.core.datamap.{DataMapRegistry, DataMapStoreManager}
-import org.apache.carbondata.core.datamap.status.DataMapStatusManager
-import org.apache.carbondata.datamap.{DataMapManager, IndexDataMapRefreshRDD}
-
-/**
- * Refresh the datamaps through sync with main table data. After sync with parent table's it enables
- * the datamap.
- */
-case class CarbonDataMapRefreshCommand(
- dataMapName: String,
- tableIdentifier: Option[TableIdentifier]) extends DataCommand {
-
- override def processData(sparkSession: SparkSession): Seq[Row] = {
- val schema = DataMapStoreManager.getInstance().getDataMapSchema(dataMapName)
-
- val table = tableIdentifier match {
- case Some(identifier) =>
- CarbonEnv.getCarbonTable(identifier)(sparkSession)
- case _ =>
- CarbonEnv.getCarbonTable(
- Option(schema.getRelationIdentifier.getDatabaseName),
- schema.getRelationIdentifier.getTableName
- )(sparkSession)
- }
- val provider = DataMapManager.get().getDataMapProvider(table, schema, sparkSession)
- provider.rebuild()
-
- // After sync success enable the datamap.
- DataMapStatusManager.enableDataMap(dataMapName)
- Seq.empty
- }
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala
index 1bb7d7c..4f60297 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala
@@ -231,8 +231,9 @@ class SparkCarbonFileFormat extends FileFormat
val tab = model.getTable
DataMapStoreManager.getInstance().clearDataMaps(identifier)
- val dataMapExprWrapper = DataMapChooser.get
- .choose(tab, model.getFilterExpressionResolverTree)
+
+ val dataMapExprWrapper = new DataMapChooser(tab).choose(
+ model.getFilterExpressionResolverTree)
// TODO : handle the partition for CarbonFileLevelFormat
val prunedBlocklets = dataMapExprWrapper.prune(segments, null)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index 62da7ed..9dd8105 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.{CarbonDDLSqlParser, TableIdentifier}
import org.apache.spark.sql.catalyst.CarbonTableIdentifierImplicit._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.command._
-import org.apache.spark.sql.execution.command.datamap.{CarbonCreateDataMapCommand, CarbonDataMapRefreshCommand, CarbonDataMapShowCommand, CarbonDropDataMapCommand}
+import org.apache.spark.sql.execution.command.datamap.{CarbonCreateDataMapCommand, CarbonDataMapRebuildCommand, CarbonDataMapShowCommand, CarbonDropDataMapCommand}
import org.apache.spark.sql.execution.command.management._
import org.apache.spark.sql.execution.command.partition.{CarbonAlterTableDropPartitionCommand, CarbonAlterTableSplitPartitionCommand}
import org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand, CarbonAlterTableDataTypeChangeCommand, CarbonAlterTableDropColumnCommand}
@@ -147,19 +147,23 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
/**
* The syntax of datamap creation is as follows.
- * CREATE DATAMAP IF NOT EXISTS datamapName ON TABLE tableName USING 'DataMapProviderName'
+ * CREATE DATAMAP IF NOT EXISTS datamapName [ON TABLE tableName]
+ * USING 'DataMapProviderName'
+ * [WITH DEFERRED REBUILD]
* DMPROPERTIES('KEY'='VALUE') AS SELECT COUNT(COL1) FROM tableName
*/
protected lazy val createDataMap: Parser[LogicalPlan] =
CREATE ~> DATAMAP ~> opt(IF ~> NOT ~> EXISTS) ~ ident ~
opt(ontable) ~
- (USING ~> stringLit) ~ (DMPROPERTIES ~> "(" ~> repsep(loadOptions, ",") <~ ")").? ~
+ (USING ~> stringLit) ~
+ opt(WITH ~> DEFERRED ~> REBUILD) ~
+ (DMPROPERTIES ~> "(" ~> repsep(loadOptions, ",") <~ ")").? ~
(AS ~> restInput).? <~ opt(";") ^^ {
- case ifnotexists ~ dmname ~ tableIdent ~ dmProviderName ~ dmprops ~ query =>
+ case ifnotexists ~ dmname ~ tableIdent ~ dmProviderName ~ deferred ~ dmprops ~ query =>
val map = dmprops.getOrElse(List[(String, String)]()).toMap[String, String]
CarbonCreateDataMapCommand(dmname, tableIdent, dmProviderName, map, query,
- ifnotexists.isDefined)
+ ifnotexists.isDefined, deferred.isDefined)
}
protected lazy val ontable: Parser[TableIdentifier] =
@@ -190,12 +194,12 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
/**
* The syntax of show datamap is used to show datamaps on the table
- * REFRESH DATAMAP datamapname [ON TABLE] tableName
+ * REBUILD DATAMAP datamapname [ON TABLE] tableName
*/
protected lazy val refreshDataMap: Parser[LogicalPlan] =
- REFRESH ~> DATAMAP ~> ident ~ opt(ontable) <~ opt(";") ^^ {
+ REBUILD ~> DATAMAP ~> ident ~ opt(ontable) <~ opt(";") ^^ {
case datamap ~ tableIdent =>
- CarbonDataMapRefreshCommand(datamap, tableIdent)
+ CarbonDataMapRebuildCommand(datamap, tableIdent)
}
protected lazy val deleteRecords: Parser[LogicalPlan] =
http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala b/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala
index a8e7b6c..7df3901 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala
@@ -22,44 +22,56 @@ import java.util.UUID
import scala.util.Random
+import org.apache.spark.sql.{CarbonSession, DataFrame}
import org.apache.spark.sql.test.util.QueryTest
-import org.scalatest.BeforeAndAfterAll
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
+import org.apache.carbondata.core.datamap.status.DataMapStatusManager
import org.apache.carbondata.core.util.CarbonProperties
-class BloomCoarseGrainDataMapSuite extends QueryTest with BeforeAndAfterAll {
- val inputFile = s"$resourcesPath/bloom_datamap_input.csv"
+class BloomCoarseGrainDataMapSuite extends QueryTest with BeforeAndAfterAll with BeforeAndAfterEach {
+ val bigFile = s"$resourcesPath/bloom_datamap_input_big.csv"
+ val smallFile = s"$resourcesPath/bloom_datamap_input_small.csv"
val normalTable = "carbon_normal"
val bloomDMSampleTable = "carbon_bloom"
val dataMapName = "bloom_dm"
- val lineNum = 500000
override protected def beforeAll(): Unit = {
new File(CarbonProperties.getInstance().getSystemFolderLocation).delete()
- createFile(inputFile, line = lineNum, start = 0)
+ createFile(bigFile, line = 500000)
+ createFile(smallFile)
sql(s"DROP TABLE IF EXISTS $normalTable")
sql(s"DROP TABLE IF EXISTS $bloomDMSampleTable")
}
- private def checkQuery = {
+ override def afterEach(): Unit = {
+ sql(s"DROP TABLE IF EXISTS $normalTable")
+ sql(s"DROP TABLE IF EXISTS $bloomDMSampleTable")
+ }
+
+ private def checkSqlHitDataMap(sqlText: String, dataMapName: String, shouldHit: Boolean): DataFrame = {
+ if (shouldHit) {
+ assert(sqlContext.sparkSession.asInstanceOf[CarbonSession].isDataMapHit(sqlText, dataMapName))
+ } else {
+ assert(!sqlContext.sparkSession.asInstanceOf[CarbonSession].isDataMapHit(sqlText, dataMapName))
+ }
+ sql(sqlText)
+ }
+
+ private def checkQuery(dataMapName: String, shouldHit: Boolean = true) = {
checkAnswer(
- sql(s"select * from $bloomDMSampleTable where id = 1"),
+ checkSqlHitDataMap(s"select * from $bloomDMSampleTable where id = 1", dataMapName, shouldHit),
sql(s"select * from $normalTable where id = 1"))
checkAnswer(
- sql(s"select * from $bloomDMSampleTable where id = 999"),
+ checkSqlHitDataMap(s"select * from $bloomDMSampleTable where id = 999", dataMapName, shouldHit),
sql(s"select * from $normalTable where id = 999"))
checkAnswer(
- sql(s"select * from $bloomDMSampleTable where city = 'city_1'"),
+ checkSqlHitDataMap(s"select * from $bloomDMSampleTable where city = 'city_1'", dataMapName, shouldHit),
sql(s"select * from $normalTable where city = 'city_1'"))
checkAnswer(
- sql(s"select * from $bloomDMSampleTable where city = 'city_999'"),
+ checkSqlHitDataMap(s"select * from $bloomDMSampleTable where city = 'city_999'", dataMapName, shouldHit),
sql(s"select * from $normalTable where city = 'city_999'"))
- checkAnswer(
- sql(s"select count(distinct id), count(distinct name), count(distinct city)," +
- s" count(distinct s1), count(distinct s2) from $bloomDMSampleTable"),
- sql(s"select count(distinct id), count(distinct name), count(distinct city)," +
- s" count(distinct s1), count(distinct s2) from $normalTable"))
- checkAnswer(
+ checkAnswer(
sql(s"select min(id), max(id), min(name), max(name), min(city), max(city)" +
s" from $bloomDMSampleTable"),
sql(s"select min(id), max(id), min(name), max(name), min(city), max(city)" +
@@ -86,28 +98,34 @@ class BloomCoarseGrainDataMapSuite extends QueryTest with BeforeAndAfterAll {
| DMProperties('INDEX_COLUMNS'='city,id', 'BLOOM_SIZE'='640000')
""".stripMargin)
+ var map = DataMapStatusManager.readDataMapStatusMap()
+ assert(map.get(dataMapName).isEnabled)
+
// load two segments
(1 to 2).foreach { i =>
sql(
s"""
- | LOAD DATA LOCAL INPATH '$inputFile' INTO TABLE $normalTable
+ | LOAD DATA LOCAL INPATH '$bigFile' INTO TABLE $normalTable
| OPTIONS('header'='false')
""".stripMargin)
sql(
s"""
- | LOAD DATA LOCAL INPATH '$inputFile' INTO TABLE $bloomDMSampleTable
+ | LOAD DATA LOCAL INPATH '$bigFile' INTO TABLE $bloomDMSampleTable
| OPTIONS('header'='false')
""".stripMargin)
}
+ map = DataMapStatusManager.readDataMapStatusMap()
+ assert(map.get(dataMapName).isEnabled)
+
sql(s"SHOW DATAMAP ON TABLE $bloomDMSampleTable").show(false)
checkExistence(sql(s"SHOW DATAMAP ON TABLE $bloomDMSampleTable"), true, dataMapName)
- checkQuery
+ checkQuery(dataMapName)
sql(s"DROP TABLE IF EXISTS $normalTable")
sql(s"DROP TABLE IF EXISTS $bloomDMSampleTable")
}
- test("test create bloom datamap and refresh datamap") {
+ test("test create bloom datamap and REBUILD DATAMAP") {
sql(
s"""
| CREATE TABLE $normalTable(id INT, name STRING, city STRING, age INT,
@@ -125,12 +143,12 @@ class BloomCoarseGrainDataMapSuite extends QueryTest with BeforeAndAfterAll {
(1 to 2).foreach { i =>
sql(
s"""
- | LOAD DATA LOCAL INPATH '$inputFile' INTO TABLE $normalTable
+ | LOAD DATA LOCAL INPATH '$bigFile' INTO TABLE $normalTable
| OPTIONS('header'='false')
""".stripMargin)
sql(
s"""
- | LOAD DATA LOCAL INPATH '$inputFile' INTO TABLE $bloomDMSampleTable
+ | LOAD DATA LOCAL INPATH '$bigFile' INTO TABLE $bloomDMSampleTable
| OPTIONS('header'='false')
""".stripMargin)
}
@@ -142,18 +160,135 @@ class BloomCoarseGrainDataMapSuite extends QueryTest with BeforeAndAfterAll {
| DMProperties('INDEX_COLUMNS'='city,id', 'BLOOM_SIZE'='640000')
""".stripMargin)
- sql(s"REFRESH DATAMAP $dataMapName ON TABLE $bloomDMSampleTable")
sql(s"SHOW DATAMAP ON TABLE $bloomDMSampleTable").show(false)
checkExistence(sql(s"SHOW DATAMAP ON TABLE $bloomDMSampleTable"), true, dataMapName)
- checkQuery
+ checkQuery(dataMapName)
+ sql(s"DROP TABLE IF EXISTS $normalTable")
+ sql(s"DROP TABLE IF EXISTS $bloomDMSampleTable")
+ }
+
+ test("test create bloom datamap with DEFERRED REBUILD, query hit datamap") {
+ sql(
+ s"""
+ | CREATE TABLE $normalTable(id INT, name STRING, city STRING, age INT,
+ | s1 STRING, s2 STRING, s3 STRING, s4 STRING, s5 STRING, s6 STRING, s7 STRING, s8 STRING)
+ | STORED BY 'carbondata' TBLPROPERTIES('table_blocksize'='128')
+ | """.stripMargin)
+ sql(
+ s"""
+ | CREATE TABLE $bloomDMSampleTable(id INT, name STRING, city STRING, age INT,
+ | s1 STRING, s2 STRING, s3 STRING, s4 STRING, s5 STRING, s6 STRING, s7 STRING, s8 STRING)
+ | STORED BY 'carbondata' TBLPROPERTIES('table_blocksize'='128')
+ | """.stripMargin)
+
+ sql(
+ s"""
+ | LOAD DATA LOCAL INPATH '$smallFile' INTO TABLE $normalTable
+ | OPTIONS('header'='false')
+ """.stripMargin)
+ sql(
+ s"""
+ | LOAD DATA LOCAL INPATH '$smallFile' INTO TABLE $bloomDMSampleTable
+ | OPTIONS('header'='false')
+ """.stripMargin)
+
+ sql(
+ s"""
+ | CREATE DATAMAP $dataMapName ON TABLE $bloomDMSampleTable
+ | USING 'bloomfilter'
+ | WITH DEFERRED REBUILD
+ | DMProperties('INDEX_COLUMNS'='city,id', 'BLOOM_SIZE'='640000')
+ """.stripMargin)
+
+ var map = DataMapStatusManager.readDataMapStatusMap()
+ assert(!map.get(dataMapName).isEnabled)
+
+ sql(
+ s"""
+ | LOAD DATA LOCAL INPATH '$smallFile' INTO TABLE $normalTable
+ | OPTIONS('header'='false')
+ """.stripMargin)
+ sql(
+ s"""
+ | LOAD DATA LOCAL INPATH '$smallFile' INTO TABLE $bloomDMSampleTable
+ | OPTIONS('header'='false')
+ """.stripMargin)
+
+ map = DataMapStatusManager.readDataMapStatusMap()
+ assert(!map.get(dataMapName).isEnabled)
+
+ // once we rebuild, it should be enabled
+ sql(s"REBUILD DATAMAP $dataMapName ON TABLE $bloomDMSampleTable")
+ map = DataMapStatusManager.readDataMapStatusMap()
+ assert(map.get(dataMapName).isEnabled)
+
+ sql(s"SHOW DATAMAP ON TABLE $bloomDMSampleTable").show(false)
+ checkExistence(sql(s"SHOW DATAMAP ON TABLE $bloomDMSampleTable"), true, dataMapName)
+ checkQuery(dataMapName)
+
+ // once we load again, datamap should be disabled, since it is lazy
+ sql(
+ s"""
+ | LOAD DATA LOCAL INPATH '$smallFile' INTO TABLE $normalTable
+ | OPTIONS('header'='false')
+ """.stripMargin)
+ sql(
+ s"""
+ | LOAD DATA LOCAL INPATH '$smallFile' INTO TABLE $bloomDMSampleTable
+ | OPTIONS('header'='false')
+ """.stripMargin)
+ map = DataMapStatusManager.readDataMapStatusMap()
+ assert(!map.get(dataMapName).isEnabled)
+ checkQuery(dataMapName, shouldHit = false)
+
sql(s"DROP TABLE IF EXISTS $normalTable")
sql(s"DROP TABLE IF EXISTS $bloomDMSampleTable")
}
- // todo: will add more tests on bloom datamap, such as exception, delete datamap, show profiler
+ test("test create bloom datamap with DEFERRED REBUILD, query not hit datamap") {
+ sql(
+ s"""
+ | CREATE TABLE $normalTable(id INT, name STRING, city STRING, age INT,
+ | s1 STRING, s2 STRING, s3 STRING, s4 STRING, s5 STRING, s6 STRING, s7 STRING, s8 STRING)
+ | STORED BY 'carbondata' TBLPROPERTIES('table_blocksize'='128')
+ | """.stripMargin)
+ sql(
+ s"""
+ | CREATE TABLE $bloomDMSampleTable(id INT, name STRING, city STRING, age INT,
+ | s1 STRING, s2 STRING, s3 STRING, s4 STRING, s5 STRING, s6 STRING, s7 STRING, s8 STRING)
+ | STORED BY 'carbondata' TBLPROPERTIES('table_blocksize'='128')
+ | """.stripMargin)
+
+ sql(
+ s"""
+ | LOAD DATA LOCAL INPATH '$smallFile' INTO TABLE $normalTable
+ | OPTIONS('header'='false')
+ """.stripMargin)
+ sql(
+ s"""
+ | LOAD DATA LOCAL INPATH '$smallFile' INTO TABLE $bloomDMSampleTable
+ | OPTIONS('header'='false')
+ """.stripMargin)
+
+ sql(
+ s"""
+ | CREATE DATAMAP $dataMapName ON TABLE $bloomDMSampleTable
+ | USING 'bloomfilter'
+ | WITH DEFERRED REBUILD
+ | DMProperties('INDEX_COLUMNS'='city,id', 'BLOOM_SIZE'='640000')
+ """.stripMargin)
+
+ checkExistence(sql(s"SHOW DATAMAP ON TABLE $bloomDMSampleTable"), true, dataMapName)
+
+ // datamap is not loaded, so it should not hit
+ checkQuery(dataMapName, shouldHit = false)
+ sql(s"DROP TABLE IF EXISTS $normalTable")
+ sql(s"DROP TABLE IF EXISTS $bloomDMSampleTable")
+ }
override protected def afterAll(): Unit = {
- deleteFile(inputFile)
+ deleteFile(bigFile)
+ deleteFile(smallFile)
sql(s"DROP TABLE IF EXISTS $normalTable")
sql(s"DROP TABLE IF EXISTS $bloomDMSampleTable")
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
index 9c3d5d6..3dc34d3 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
@@ -61,8 +61,12 @@ public class DataMapWriterListener {
}
if (tableIndices != null) {
for (TableDataMap tableDataMap : tableIndices) {
- DataMapFactory factory = tableDataMap.getDataMapFactory();
- register(factory, segmentId, taskNo);
+ // register it only if it is not lazy datamap, for lazy datamap, user
+ // will rebuild the datamap manually
+ if (!tableDataMap.getDataMapSchema().isLazy()) {
+ DataMapFactory factory = tableDataMap.getDataMapFactory();
+ register(factory, segmentId, taskNo);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
----------------------------------------------------------------------
diff --git a/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java b/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
index 957e9f8..35acb17 100644
--- a/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
+++ b/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
@@ -21,12 +21,11 @@ import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
+import java.util.Objects;
import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.datamap.DataMapChooser;
-import org.apache.carbondata.core.datamap.DataMapLevel;
import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper;
import org.apache.carbondata.core.datastore.block.TableBlockInfo;
@@ -104,8 +103,10 @@ public class SearchRequestHandler {
LOG.info(String.format("[SearchId:%d] %s, number of block: %d",
request.searchId(), queryModel.toString(), mbSplit.getAllSplits().size()));
- // If there is FGDataMap, prune the split by applying FGDataMap
- queryModel = tryPruneByFGDataMap(request.searchId(), table, queryModel, mbSplit);
+ // If there is DataMap selected in Master, prune the split by it
+ if (request.dataMap() != null) {
+ queryModel = prune(request.searchId(), table, queryModel, mbSplit, request.dataMap().get());
+ }
// In search mode, reader will read multiple blocks by using a thread pool
CarbonRecordReader<CarbonRow> reader =
@@ -135,35 +136,32 @@ public class SearchRequestHandler {
* If there is FGDataMap defined for this table and filter condition in the query,
* prune the splits by the DataMap and set the pruned split into the QueryModel and return
*/
- private QueryModel tryPruneByFGDataMap(int queryId,
- CarbonTable table, QueryModel queryModel, CarbonMultiBlockSplit mbSplit) throws IOException {
- DataMapExprWrapper wrapper =
- DataMapChooser.get().choose(table, queryModel.getFilterExpressionResolverTree());
-
- if (wrapper.getDataMapLevel() == DataMapLevel.FG) {
- List<Segment> segments = new LinkedList<>();
- for (CarbonInputSplit split : mbSplit.getAllSplits()) {
- segments.add(Segment.toSegment(
- split.getSegmentId(), new LatestFilesReadCommittedScope(table.getTablePath())));
- }
- List<ExtendedBlocklet> prunnedBlocklets = wrapper.prune(segments, null);
+ private QueryModel prune(int queryId, CarbonTable table, QueryModel queryModel,
+ CarbonMultiBlockSplit mbSplit, DataMapExprWrapper datamap) throws IOException {
+ Objects.requireNonNull(datamap);
+ List<Segment> segments = new LinkedList<>();
+ for (CarbonInputSplit split : mbSplit.getAllSplits()) {
+ segments.add(
+ Segment.toSegment(split.getSegmentId(),
+ new LatestFilesReadCommittedScope(table.getTablePath())));
+ }
+ List<ExtendedBlocklet> prunnedBlocklets = datamap.prune(segments, null);
- List<String> pathToRead = new LinkedList<>();
- for (ExtendedBlocklet prunnedBlocklet : prunnedBlocklets) {
- pathToRead.add(prunnedBlocklet.getPath());
- }
+ List<String> pathToRead = new LinkedList<>();
+ for (ExtendedBlocklet prunnedBlocklet : prunnedBlocklets) {
+ pathToRead.add(prunnedBlocklet.getPath());
+ }
- List<TableBlockInfo> blocks = queryModel.getTableBlockInfos();
- List<TableBlockInfo> blockToRead = new LinkedList<>();
- for (TableBlockInfo block : blocks) {
- if (pathToRead.contains(block.getFilePath())) {
- blockToRead.add(block);
- }
+ List<TableBlockInfo> blocks = queryModel.getTableBlockInfos();
+ List<TableBlockInfo> blockToRead = new LinkedList<>();
+ for (TableBlockInfo block : blocks) {
+ if (pathToRead.contains(block.getFilePath())) {
+ blockToRead.add(block);
}
- LOG.info(String.format("[SearchId:%d] pruned using FG DataMap, pruned blocks: %d",
- queryId, blockToRead.size()));
- queryModel.setTableBlockInfos(blockToRead);
}
+ LOG.info(String.format("[SearchId:%d] pruned using FG DataMap, pruned blocks: %d", queryId,
+ blockToRead.size()));
+ queryModel.setTableBlockInfos(blockToRead);
return queryModel;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/store/search/src/main/scala/org/apache/spark/rpc/Master.scala
----------------------------------------------------------------------
diff --git a/store/search/src/main/scala/org/apache/spark/rpc/Master.scala b/store/search/src/main/scala/org/apache/spark/rpc/Master.scala
index 5b31a49..2e9a532 100644
--- a/store/search/src/main/scala/org/apache/spark/rpc/Master.scala
+++ b/store/search/src/main/scala/org/apache/spark/rpc/Master.scala
@@ -38,12 +38,15 @@ import org.apache.spark.util.ThreadUtils
import org.apache.carbondata.common.annotations.InterfaceAudience
import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.datamap.DataMapChooser
+import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper
import org.apache.carbondata.core.datastore.block.Distributable
import org.apache.carbondata.core.datastore.row.CarbonRow
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.scan.expression.Expression
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.hadoop.CarbonMultiBlockSplit
+import org.apache.carbondata.hadoop.api.CarbonInputFormat
import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
import org.apache.carbondata.processing.util.CarbonLoaderUtil
import org.apache.carbondata.store.worker.Status
@@ -212,11 +215,13 @@ class Master(sparkConf: SparkConf) {
// prune data and get a mapping of worker hostname to list of blocks,
// then add these blocks to the SearchRequest and fire the RPC call
val nodeBlockMapping: JMap[String, JList[Distributable]] = pruneBlock(table, columns, filter)
+ val fgDataMap = chooseFGDataMap(table, filter)
val tuple = nodeBlockMapping.asScala.map { case (splitAddress, blocks) =>
// Build a SearchRequest
val split = new SerializableWritable[CarbonMultiBlockSplit](
new CarbonMultiBlockSplit(blocks, splitAddress))
- val request = SearchRequest(queryId, split, table.getTableInfo, columns, filter, localLimit)
+ val request =
+ SearchRequest(queryId, split, table.getTableInfo, columns, filter, localLimit, fgDataMap)
// Find an Endpoind and send the request to it
// This RPC is non-blocking so that we do not need to wait before send to next worker
@@ -249,6 +254,14 @@ class Master(sparkConf: SparkConf) {
output.toArray
}
+ private def chooseFGDataMap(
+ table: CarbonTable,
+ filter: Expression): Option[DataMapExprWrapper] = {
+ val chooser = new DataMapChooser(table)
+ val filterInterface = table.resolveFilter(filter)
+ Option(chooser.chooseFGDataMap(filterInterface))
+ }
+
/**
* Prune data by using CarbonInputFormat.getSplit
* Return a mapping of host address to list of block
@@ -261,6 +274,9 @@ class Master(sparkConf: SparkConf) {
val job = new Job(jobConf)
val format = CarbonInputFormatUtil.createCarbonTableInputFormat(
job, table, columns, filter, null, null)
+
+ // We will do FG pruning in reader side, so don't do it here
+ CarbonInputFormat.setFgDataMapPruning(job.getConfiguration, false)
val splits = format.getSplits(job)
val distributables = splits.asScala.map { split =>
split.asInstanceOf[Distributable]
http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/store/search/src/main/scala/org/apache/spark/search/Searcher.scala
----------------------------------------------------------------------
diff --git a/store/search/src/main/scala/org/apache/spark/search/Searcher.scala b/store/search/src/main/scala/org/apache/spark/search/Searcher.scala
index e467fd3..1532284 100644
--- a/store/search/src/main/scala/org/apache/spark/search/Searcher.scala
+++ b/store/search/src/main/scala/org/apache/spark/search/Searcher.scala
@@ -21,6 +21,7 @@ import org.apache.spark.SerializableWritable
import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEnv}
import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper
import org.apache.carbondata.core.metadata.schema.table.TableInfo
import org.apache.carbondata.core.scan.expression.Expression
import org.apache.carbondata.hadoop.CarbonMultiBlockSplit
@@ -37,11 +38,11 @@ class Searcher(override val rpcEnv: RpcEnv) extends RpcEndpoint {
}
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
- case req@SearchRequest(_, _, _, _, _, _) =>
+ case req: SearchRequest =>
val response = new SearchRequestHandler().handleSearch(req)
context.reply(response)
- case req@ShutdownRequest(_) =>
+ case req: ShutdownRequest =>
val response = new SearchRequestHandler().handleShutdown(req)
context.reply(response)
@@ -59,7 +60,8 @@ case class SearchRequest(
tableInfo: TableInfo,
projectColumns: Array[String],
filterExpression: Expression,
- limit: Long)
+ limit: Long,
+ dataMap: Option[DataMapExprWrapper])
// Search result sent from worker to master
case class SearchResult(