You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/07/11 05:18:00 UTC
[1/3] carbondata git commit: [CARBONDATA-1232] Datamap implementation
for Blocklet
Repository: carbondata
Updated Branches:
refs/heads/datamap 3ecb3ec58 -> 6d71d9c47
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
index c328a64..01b4e61 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
@@ -17,14 +17,31 @@
package org.apache.carbondata.presto.impl;
-import com.facebook.presto.spi.SchemaTableName;
-import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.inject.Inject;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.*;
-import org.apache.carbondata.core.datastore.block.*;
+import org.apache.carbondata.core.datastore.DataRefNode;
+import org.apache.carbondata.core.datastore.DataRefNodeFinder;
+import org.apache.carbondata.core.datastore.IndexKey;
+import org.apache.carbondata.core.datastore.SegmentTaskIndexStore;
+import org.apache.carbondata.core.datastore.TableSegmentUniqueIdentifier;
+import org.apache.carbondata.core.datastore.block.AbstractIndex;
+import org.apache.carbondata.core.datastore.block.BlockletInfos;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.datastore.block.SegmentTaskIndexWrapper;
+import org.apache.carbondata.core.datastore.block.TableBlockInfo;
import org.apache.carbondata.core.datastore.exception.IndexBuilderException;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.impl.FileFactory;
@@ -52,18 +69,24 @@ import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.hadoop.CacheClient;
import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
+
+import com.facebook.presto.spi.SchemaTableName;
+import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.inject.Inject;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.thrift.TBase;
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
import static java.util.Objects.requireNonNull;
/** CarbonTableReader will be a facade of these utils
@@ -312,8 +335,9 @@ public class CarbonTableReader {
TableBlockInfo tableBlockInfo = leafNode.getTableBlockInfo();
if (IUDTable) {
- if (CarbonUtil.isInvalidTableBlock(tableBlockInfo, invalidBlockVOForSegmentId,
- updateStatusManager)) {
+ if (CarbonUtil
+ .isInvalidTableBlock(tableBlockInfo.getSegmentId(), tableBlockInfo.getFilePath(),
+ invalidBlockVOForSegmentId, updateStatusManager)) {
continue;
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
index 277005b..da0d082 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
@@ -29,7 +29,8 @@ import org.apache.spark.sql.execution.command.CarbonMergerMapping
import org.apache.carbondata.core.datastore.block.{Distributable, TableBlockInfo}
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
-import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit, CarbonMultiBlockSplit}
+import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit}
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
import org.apache.carbondata.processing.merger.CarbonDataMergerUtil
import org.apache.carbondata.processing.model.CarbonLoadModel
@@ -71,7 +72,7 @@ class CarbonIUDMergerRDD[K, V](
var blocksOfLastSegment: List[TableBlockInfo] = null
- CarbonInputFormat.setSegmentsToAccess(
+ CarbonTableInputFormat.setSegmentsToAccess(
job.getConfiguration, carbonMergerMapping.validSegments.toList.asJava)
// get splits
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index caa389a..0d50882 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -43,8 +43,8 @@ import org.apache.carbondata.core.mutate.UpdateVO
import org.apache.carbondata.core.scan.result.iterator.RawResultIterator
import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
-import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit, CarbonMultiBlockSplit}
+import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit}
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
import org.apache.carbondata.hadoop.util.{CarbonInputFormatUtil, CarbonInputSplitTaskInfo}
import org.apache.carbondata.processing.merger._
import org.apache.carbondata.processing.model.CarbonLoadModel
@@ -286,7 +286,7 @@ class CarbonMergerRDD[K, V](
for (eachSeg <- carbonMergerMapping.validSegments) {
// map for keeping the relation of a task and its blocks.
- job.getConfiguration.set(CarbonInputFormat.INPUT_SEGMENT_NUMBERS, eachSeg)
+ job.getConfiguration.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, eachSeg)
if (updateStatusManager.getUpdateStatusDetails.length != 0) {
updateDetails = updateStatusManager.getInvalidTimestampRange(eachSeg)
@@ -308,7 +308,8 @@ class CarbonMergerRDD[K, V](
updateStatusManager.getDeleteDeltaFilePath(entry.getPath.toString)
)
((!updated) || ((updated) && (!CarbonUtil
- .isInvalidTableBlock(blockInfo, updateDetails, updateStatusManager))))
+ .isInvalidTableBlock(blockInfo.getSegmentId, blockInfo.getFilePath,
+ updateDetails, updateStatusManager))))
})
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 4807b90..62a6a92 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -40,6 +40,7 @@ import org.apache.carbondata.core.scan.model.QueryModel
import org.apache.carbondata.core.stats.{QueryStatistic, QueryStatisticsConstants, QueryStatisticsRecorder}
import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory}
import org.apache.carbondata.hadoop._
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
import org.apache.carbondata.spark.load.CarbonLoaderUtil
@@ -245,21 +246,22 @@ class CarbonScanRDD(
iterator.asInstanceOf[Iterator[InternalRow]]
}
- private def prepareInputFormatForDriver(conf: Configuration): CarbonInputFormat[Object] = {
- CarbonInputFormat.setCarbonTable(conf, carbonTable)
+ private def prepareInputFormatForDriver(conf: Configuration): CarbonTableInputFormat[Object] = {
+ CarbonTableInputFormat.setCarbonTable(conf, carbonTable)
createInputFormat(conf)
}
- private def prepareInputFormatForExecutor(conf: Configuration): CarbonInputFormat[Object] = {
- CarbonInputFormat.setCarbonReadSupport(conf, readSupport)
+ private def prepareInputFormatForExecutor(conf: Configuration): CarbonTableInputFormat[Object] = {
+ CarbonTableInputFormat.setCarbonReadSupport(conf, readSupport)
createInputFormat(conf)
}
- private def createInputFormat(conf: Configuration): CarbonInputFormat[Object] = {
- val format = new CarbonInputFormat[Object]
- CarbonInputFormat.setTablePath(conf, identifier.appendWithLocalPrefix(identifier.getTablePath))
- CarbonInputFormat.setFilterPredicates(conf, filterExpression)
- CarbonInputFormat.setColumnProjection(conf, columnProjection)
+ private def createInputFormat(conf: Configuration): CarbonTableInputFormat[Object] = {
+ val format = new CarbonTableInputFormat[Object]
+ CarbonTableInputFormat.setTablePath(conf,
+ identifier.appendWithLocalPrefix(identifier.getTablePath))
+ CarbonTableInputFormat.setFilterPredicates(conf, filterExpression)
+ CarbonTableInputFormat.setColumnProjection(conf, columnProjection)
format
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala
index 2ca3b8c..4950227 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala
@@ -26,7 +26,7 @@ import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
-import org.apache.carbondata.hadoop.CarbonInputFormat
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
/**
@@ -38,8 +38,8 @@ object QueryPlanUtil {
* createCarbonInputFormat from query model
*/
def createCarbonInputFormat(absoluteTableIdentifier: AbsoluteTableIdentifier) :
- (CarbonInputFormat[Array[Object]], Job) = {
- val carbonInputFormat = new CarbonInputFormat[Array[Object]]()
+ (CarbonTableInputFormat[Array[Object]], Job) = {
+ val carbonInputFormat = new CarbonTableInputFormat[Array[Object]]()
val jobConf: JobConf = new JobConf(new Configuration)
val job: Job = new Job(jobConf)
FileInputFormat.addInputPath(job, new Path(absoluteTableIdentifier.getTablePath))
@@ -47,8 +47,8 @@ object QueryPlanUtil {
}
def createCarbonInputFormat[V: ClassTag](absoluteTableIdentifier: AbsoluteTableIdentifier,
- conf: Configuration) : CarbonInputFormat[V] = {
- val carbonInputFormat = new CarbonInputFormat[V]()
+ conf: Configuration) : CarbonTableInputFormat[V] = {
+ val carbonInputFormat = new CarbonTableInputFormat[V]()
val job: Job = new Job(conf)
FileInputFormat.addInputPath(job, new Path(absoluteTableIdentifier.getTablePath))
carbonInputFormat
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index 0e6153f..e3362e4 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -38,7 +38,8 @@ import org.apache.spark.util.SerializableConfiguration
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.scan.expression.logical.AndExpression
import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit, CarbonProjection}
+import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonProjection}
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
import org.apache.carbondata.hadoop.util.{CarbonInputFormatUtil, SchemaReader}
import org.apache.carbondata.processing.merger.TableMeta
import org.apache.carbondata.spark.{CarbonFilters, CarbonOption}
@@ -88,17 +89,17 @@ private[sql] case class CarbonDatasourceHadoopRelation(
filters.flatMap { filter =>
CarbonFilters.createCarbonFilter(dataSchema, filter)
}.reduceOption(new AndExpression(_, _))
- .foreach(CarbonInputFormat.setFilterPredicates(conf, _))
+ .foreach(CarbonTableInputFormat.setFilterPredicates(conf, _))
val projection = new CarbonProjection
requiredColumns.foreach(projection.addColumn)
- CarbonInputFormat.setColumnProjection(conf, projection)
- CarbonInputFormat.setCarbonReadSupport(conf, classOf[SparkRowReadSupportImpl])
+ CarbonTableInputFormat.setColumnProjection(conf, projection)
+ CarbonTableInputFormat.setCarbonReadSupport(conf, classOf[SparkRowReadSupportImpl])
new CarbonHadoopFSRDD[Row](sqlContext.sparkContext,
new SerializableConfiguration(conf),
absIdentifier,
- classOf[CarbonInputFormat[Row]],
+ classOf[CarbonTableInputFormat[Row]],
classOf[Row]
)
}
@@ -118,7 +119,7 @@ class CarbonHadoopFSRDD[V: ClassTag](
@transient sc: SparkContext,
conf: SerializableConfiguration,
identifier: AbsoluteTableIdentifier,
- inputFormatClass: Class[_ <: CarbonInputFormat[V]],
+ inputFormatClass: Class[_ <: CarbonTableInputFormat[V]],
valueClass: Class[V])
extends RDD[V](sc, Nil) with SparkHadoopMapReduceUtil {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
index a292cde..c38f0e1 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
@@ -133,13 +133,6 @@ private[sql] case class ProjectForUpdateCommand(
override def run(sqlContext: SQLContext): Seq[Row] = {
-
- // sqlContext.sparkContext.setLocalProperty(org.apache.spark.sql.execution.SQLExecution
- // .EXECUTION_ID_KEY, null)
- // DataFrame(sqlContext, plan).show(truncate = false)
- // return Seq.empty
-
-
val res = plan find {
case relation: LogicalRelation if (relation.relation
.isInstanceOf[CarbonDatasourceRelation]) =>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala
index 70c7caf..e0a8b58 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala
@@ -26,7 +26,7 @@ import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
-import org.apache.carbondata.hadoop.CarbonInputFormat
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
/**
* All the utility functions for carbon plan creation
@@ -37,8 +37,8 @@ object QueryPlanUtil {
* createCarbonInputFormat from query model
*/
def createCarbonInputFormat(absoluteTableIdentifier: AbsoluteTableIdentifier) :
- (CarbonInputFormat[Array[Object]], Job) = {
- val carbonInputFormat = new CarbonInputFormat[Array[Object]]()
+ (CarbonTableInputFormat[Array[Object]], Job) = {
+ val carbonInputFormat = new CarbonTableInputFormat[Array[Object]]()
val jobConf: JobConf = new JobConf(new Configuration)
val job: Job = new Job(jobConf)
FileInputFormat.addInputPath(job, new Path(absoluteTableIdentifier.getTablePath))
@@ -46,8 +46,8 @@ object QueryPlanUtil {
}
def createCarbonInputFormat[V: ClassTag](absoluteTableIdentifier: AbsoluteTableIdentifier,
- conf: Configuration) : CarbonInputFormat[V] = {
- val carbonInputFormat = new CarbonInputFormat[V]()
+ conf: Configuration) : CarbonTableInputFormat[V] = {
+ val carbonInputFormat = new CarbonTableInputFormat[V]()
val job: Job = new Job(conf)
FileInputFormat.addInputPath(job, new Path(absoluteTableIdentifier.getTablePath))
carbonInputFormat
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
index 04a94ce..9fde546 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
@@ -40,7 +40,8 @@ import org.apache.carbondata.core.datastore.filesystem.CarbonFile
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.datastore.impl.FileFactory.FileType
import org.apache.carbondata.core.fileoperations.FileWriteOperation
-import org.apache.carbondata.core.metadata.{CarbonMetadata, CarbonTableIdentifier}
+import org.apache.carbondata.core.indexstore.{DataMapStoreManager, DataMapType, TableDataMap}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier}
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
import org.apache.carbondata.core.metadata.datatype.DataType.DECIMAL
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
@@ -511,8 +512,10 @@ class CarbonMetastore(conf: RuntimeConfig, val storePath: String) {
val dbName = tableIdentifier.database.get
val tableName = tableIdentifier.table
- val metadataFilePath = CarbonStorePath.getCarbonTablePath(tableStorePath,
- new CarbonTableIdentifier(dbName, tableName, "")).getMetadataDirectoryPath
+ val tablePath = CarbonStorePath.getCarbonTablePath(tableStorePath,
+ new CarbonTableIdentifier(dbName, tableName, ""))
+ val metadataFilePath = tablePath.getMetadataDirectoryPath
+ val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath.getPath)
val fileType = FileFactory.getFileType(metadataFilePath)
@@ -528,6 +531,7 @@ class CarbonMetastore(conf: RuntimeConfig, val storePath: String) {
case Some(tableMeta) =>
metadata.tablesMeta -= tableMeta
CarbonMetadata.getInstance.removeTable(dbName + "_" + tableName)
+ DataMapStoreManager.getInstance.clearDataMap(identifier, "blocklet")
updateSchemasUpdatedTime(touchSchemaFileSystemTime(dbName, tableName))
case None =>
LOGGER.info(s"Metadata does not contain entry for table $tableName in database $dbName")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
index 8cdcd26..99bfd44 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
@@ -18,6 +18,7 @@ package org.apache.carbondata.processing.merger;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -379,4 +380,35 @@ public class CarbonCompactionUtil {
}
return restructuredBlockExists;
}
+
+ /**
+ * This method will check for any restructured block in the blocks selected for compaction
+ *
+ * @param segmentMapping
+ * @param tableLastUpdatedTime
+ * @return
+ */
+ public static boolean checkIfAnyRestructuredBlockExists(Map<String, TaskBlockInfo> segmentMapping,
+ long tableLastUpdatedTime) {
+ boolean restructuredBlockExists = false;
+ for (Map.Entry<String, TaskBlockInfo> taskMap : segmentMapping.entrySet()) {
+ String segmentId = taskMap.getKey();
+ TaskBlockInfo taskBlockInfo = taskMap.getValue();
+ Collection<List<TableBlockInfo>> infoList = taskBlockInfo.getAllTableBlockInfoList();
+ for (List<TableBlockInfo> listMetadata : infoList) {
+ for (TableBlockInfo blockInfo : listMetadata) {
+ // if schema modified timestamp is greater than footer stored schema timestamp,
+ // it indicates it is a restructured block
+ if (tableLastUpdatedTime > blockInfo.getDetailInfo().getSchemaUpdatedTimeStamp()) {
+ restructuredBlockExists = true;
+ break;
+ }
+ }
+ }
+ if (restructuredBlockExists) {
+ break;
+ }
+ }
+ return restructuredBlockExists;
+ }
}
[2/3] carbondata git commit: [CARBONDATA-1232] Datamap implementation
for Blocklet
Posted by ja...@apache.org.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java
new file mode 100644
index 0000000..defe766
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore.row;
+
+import org.apache.carbondata.core.indexstore.schema.DataMapSchema;
+
+/**
+ * It is just a normal row to store data. Implementation classes could be safe and unsafe.
+ * TODO move this class a global row and use across loading after DataType is changed class
+ */
+public abstract class DataMapRow {
+
+ protected DataMapSchema[] schemas;
+
+ public DataMapRow(DataMapSchema[] schemas) {
+ this.schemas = schemas;
+ }
+
+ public abstract byte[] getByteArray(int ordinal);
+
+ public abstract DataMapRow getRow(int ordinal);
+
+ public abstract void setRow(DataMapRow row, int ordinal);
+
+ public abstract void setByteArray(byte[] byteArray, int ordinal);
+
+ public abstract int getInt(int ordinal);
+
+ public abstract void setInt(int value, int ordinal);
+
+ public abstract void setByte(byte value, int ordinal);
+
+ public abstract byte getByte(int ordinal);
+
+ public abstract void setShort(short value, int ordinal);
+
+ public abstract short getShort(int ordinal);
+
+ public abstract void setLong(long value, int ordinal);
+
+ public abstract long getLong(int ordinal);
+
+ public abstract void setFloat(float value, int ordinal);
+
+ public abstract float getFloat(int ordinal);
+
+ public abstract void setDouble(double value, int ordinal);
+
+ public abstract double getDouble(int ordinal);
+
+ public int getTotalSizeInBytes() {
+ int len = 0;
+ for (int i = 0; i < schemas.length; i++) {
+ len += getSizeInBytes(i);
+ }
+ return len;
+ }
+
+ public int getSizeInBytes(int ordinal) {
+ switch (schemas[ordinal].getSchemaType()) {
+ case FIXED:
+ return schemas[ordinal].getLength();
+ case VARIABLE:
+ return getByteArray(ordinal).length + 2;
+ case STRUCT:
+ return getRow(ordinal).getTotalSizeInBytes();
+ default:
+ throw new UnsupportedOperationException("wrong type");
+ }
+ }
+
+ public int getColumnCount() {
+ return schemas.length;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java
new file mode 100644
index 0000000..adec346
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore.row;
+
+import org.apache.carbondata.core.indexstore.schema.DataMapSchema;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+
+/**
+ * Data map row.
+ */
+public class DataMapRowImpl extends DataMapRow {
+
+ private Object[] data;
+
+ public DataMapRowImpl(DataMapSchema[] schemas) {
+ super(schemas);
+ this.data = new Object[schemas.length];
+ }
+
+ @Override public byte[] getByteArray(int ordinal) {
+ return (byte[]) data[ordinal];
+ }
+
+ @Override public DataMapRow getRow(int ordinal) {
+ return (DataMapRow) data[ordinal];
+ }
+
+ @Override public void setByteArray(byte[] byteArray, int ordinal) {
+ assert (schemas[ordinal].getDataType() == DataType.BYTE_ARRAY);
+ data[ordinal] = byteArray;
+ }
+
+ @Override public int getInt(int ordinal) {
+ return (Integer) data[ordinal];
+ }
+
+ @Override public void setInt(int value, int ordinal) {
+ assert (schemas[ordinal].getDataType() == DataType.INT);
+ data[ordinal] = value;
+ }
+
+ @Override public void setByte(byte value, int ordinal) {
+ assert (schemas[ordinal].getDataType() == DataType.BYTE);
+ data[ordinal] = value;
+ }
+
+ @Override public byte getByte(int ordinal) {
+ return (Byte) data[ordinal];
+ }
+
+ @Override public void setShort(short value, int ordinal) {
+ assert (schemas[ordinal].getDataType() == DataType.SHORT);
+ data[ordinal] = value;
+ }
+
+ @Override public short getShort(int ordinal) {
+ return (Short) data[ordinal];
+ }
+
+ @Override public void setLong(long value, int ordinal) {
+ assert (schemas[ordinal].getDataType() == DataType.LONG);
+ data[ordinal] = value;
+ }
+
+ @Override public long getLong(int ordinal) {
+ return (Long) data[ordinal];
+ }
+
+ @Override public void setFloat(float value, int ordinal) {
+ assert (schemas[ordinal].getDataType() == DataType.FLOAT);
+ data[ordinal] = value;
+ }
+
+ @Override public float getFloat(int ordinal) {
+ return (Float) data[ordinal];
+ }
+
+ @Override public void setDouble(double value, int ordinal) {
+ assert (schemas[ordinal].getDataType() == DataType.DOUBLE);
+ data[ordinal] = value;
+ }
+
+ @Override public void setRow(DataMapRow row, int ordinal) {
+ assert (schemas[ordinal].getDataType() == DataType.STRUCT);
+ data[ordinal] = row;
+ }
+
+ @Override public double getDouble(int ordinal) {
+ return (Double) data[ordinal];
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java b/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java
new file mode 100644
index 0000000..ef78514
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.indexstore.row;
+
+import org.apache.carbondata.core.indexstore.schema.DataMapSchema;
+import org.apache.carbondata.core.memory.MemoryBlock;
+
+import static org.apache.carbondata.core.memory.CarbonUnsafe.BYTE_ARRAY_OFFSET;
+import static org.apache.carbondata.core.memory.CarbonUnsafe.unsafe;
+
+/**
+ * Unsafe implementation of data map row.
+ */
+public class UnsafeDataMapRow extends DataMapRow {
+
+ private MemoryBlock block;
+
+ private int pointer;
+
+ public UnsafeDataMapRow(DataMapSchema[] schemas, MemoryBlock block, int pointer) {
+ super(schemas);
+ this.block = block;
+ this.pointer = pointer;
+ }
+
+ @Override public byte[] getByteArray(int ordinal) {
+ int length;
+ int position = getPosition(ordinal);
+ switch (schemas[ordinal].getSchemaType()) {
+ case VARIABLE:
+ length = unsafe.getShort(block.getBaseObject(), block.getBaseOffset() + pointer + position);
+ position += 2;
+ break;
+ default:
+ length = schemas[ordinal].getLength();
+ }
+ byte[] data = new byte[length];
+ unsafe.copyMemory(block.getBaseObject(), block.getBaseOffset() + pointer + position, data,
+ BYTE_ARRAY_OFFSET, data.length);
+ return data;
+ }
+
+ @Override public DataMapRow getRow(int ordinal) {
+ DataMapSchema[] childSchemas =
+ ((DataMapSchema.StructDataMapSchema) schemas[ordinal]).getChildSchemas();
+ return new UnsafeDataMapRow(childSchemas, block, pointer + getPosition(ordinal));
+ }
+
+ @Override public void setByteArray(byte[] byteArray, int ordinal) {
+ throw new UnsupportedOperationException("Not supported to set on unsafe row");
+ }
+
+ @Override public int getInt(int ordinal) {
+ return unsafe
+ .getInt(block.getBaseObject(), block.getBaseOffset() + pointer + getPosition(ordinal));
+ }
+
+ @Override public void setInt(int value, int ordinal) {
+ throw new UnsupportedOperationException("Not supported to set on unsafe row");
+ }
+
+ @Override public void setByte(byte value, int ordinal) {
+ throw new UnsupportedOperationException("Not supported to set on unsafe row");
+ }
+
+ @Override public byte getByte(int ordinal) {
+ return unsafe
+ .getByte(block.getBaseObject(), block.getBaseOffset() + pointer + getPosition(ordinal));
+ }
+
+ @Override public void setShort(short value, int ordinal) {
+ throw new UnsupportedOperationException("Not supported to set on unsafe row");
+ }
+
+ @Override public short getShort(int ordinal) {
+ return unsafe
+ .getShort(block.getBaseObject(), block.getBaseOffset() + pointer + getPosition(ordinal));
+ }
+
+ @Override public void setLong(long value, int ordinal) {
+ throw new UnsupportedOperationException("Not supported to set on unsafe row");
+ }
+
+ @Override public long getLong(int ordinal) {
+ return unsafe
+ .getLong(block.getBaseObject(), block.getBaseOffset() + pointer + getPosition(ordinal));
+ }
+
+ @Override public void setFloat(float value, int ordinal) {
+ throw new UnsupportedOperationException("Not supported to set on unsafe row");
+ }
+
+ @Override public float getFloat(int ordinal) {
+ return unsafe
+ .getFloat(block.getBaseObject(), block.getBaseOffset() + pointer + getPosition(ordinal));
+ }
+
+ @Override public void setDouble(double value, int ordinal) {
+ throw new UnsupportedOperationException("Not supported to set on unsafe row");
+ }
+
+ @Override public double getDouble(int ordinal) {
+ return unsafe
+ .getDouble(block.getBaseObject(), block.getBaseOffset() + pointer + getPosition(ordinal));
+ }
+
+ @Override public void setRow(DataMapRow row, int ordinal) {
+ throw new UnsupportedOperationException("Not supported to set on unsafe row");
+ }
+
+ private int getPosition(int ordinal) {
+ int position = 0;
+ for (int i = 0; i < ordinal; i++) {
+ position += getSizeInBytes(i);
+ }
+ return position;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/indexstore/schema/DataMapSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/schema/DataMapSchema.java b/core/src/main/java/org/apache/carbondata/core/indexstore/schema/DataMapSchema.java
new file mode 100644
index 0000000..80c68ac
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/schema/DataMapSchema.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore.schema;
+
+import org.apache.carbondata.core.metadata.datatype.DataType;
+
+/**
+ * It just have 2 types right now, either fixed or variable.
+ */
+public abstract class DataMapSchema {
+
+ protected DataType dataType;
+
+ public DataMapSchema(DataType dataType) {
+ this.dataType = dataType;
+ }
+
+ /**
+ * Either fixed or variable length.
+ *
+ * @return
+ */
+ public DataType getDataType() {
+ return dataType;
+ }
+
+ /**
+ * Gives length in case of fixed schema other wise returns length
+ *
+ * @return
+ */
+ public abstract int getLength();
+
+ /**
+ * schema type
+ * @return
+ */
+ public abstract DataMapSchemaType getSchemaType();
+
+ /*
+ * It has always fixed length, length cannot be updated later.
+ * Usage examples : all primitive types like short, int etc
+ */
+ public static class FixedDataMapSchema extends DataMapSchema {
+
+ private int length;
+
+ public FixedDataMapSchema(DataType dataType) {
+ super(dataType);
+ }
+
+ public FixedDataMapSchema(DataType dataType, int length) {
+ super(dataType);
+ this.length = length;
+ }
+
+ @Override public int getLength() {
+ if (length == 0) {
+ return dataType.getSizeInBytes();
+ } else {
+ return length;
+ }
+ }
+
+ @Override public DataMapSchemaType getSchemaType() {
+ return DataMapSchemaType.FIXED;
+ }
+ }
+
+ public static class VariableDataMapSchema extends DataMapSchema {
+
+ public VariableDataMapSchema(DataType dataType) {
+ super(dataType);
+ }
+
+ @Override public int getLength() {
+ return dataType.getSizeInBytes();
+ }
+
+ @Override public DataMapSchemaType getSchemaType() {
+ return DataMapSchemaType.VARIABLE;
+ }
+ }
+
+ public static class StructDataMapSchema extends DataMapSchema {
+
+ private DataMapSchema[] childSchemas;
+
+ public StructDataMapSchema(DataType dataType, DataMapSchema[] childSchemas) {
+ super(dataType);
+ this.childSchemas = childSchemas;
+ }
+
+ @Override public int getLength() {
+ return dataType.getSizeInBytes();
+ }
+
+ public DataMapSchema[] getChildSchemas() {
+ return childSchemas;
+ }
+
+ @Override public DataMapSchemaType getSchemaType() {
+ return DataMapSchemaType.STRUCT;
+ }
+ }
+
+ public enum DataMapSchemaType {
+ FIXED, VARIABLE, STRUCT
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/indexstore/schema/FilterType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/schema/FilterType.java b/core/src/main/java/org/apache/carbondata/core/indexstore/schema/FilterType.java
new file mode 100644
index 0000000..9d77010
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/schema/FilterType.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore.schema;
+
+/**
+ * Types of filters of select query
+ */
+public enum FilterType {
+ EQUALTO, GREATER_THAN, LESS_THAN, GREATER_THAN_EQUAL, LESS_THAN_EQUAL, LIKE
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/BlockletInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/BlockletInfo.java b/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/BlockletInfo.java
index bfa9d7e..f81f805 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/BlockletInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/BlockletInfo.java
@@ -17,16 +17,22 @@
package org.apache.carbondata.core.metadata.blocklet;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
import java.io.Serializable;
+import java.util.ArrayList;
import java.util.List;
import org.apache.carbondata.core.metadata.blocklet.datachunk.DataChunk;
import org.apache.carbondata.core.metadata.blocklet.index.BlockletIndex;
+import org.apache.hadoop.io.Writable;
+
/**
* class to store the information about the blocklet
*/
-public class BlockletInfo implements Serializable {
+public class BlockletInfo implements Serializable, Writable {
/**
* serialization id
@@ -189,4 +195,49 @@ public class BlockletInfo implements Serializable {
this.numberOfPages = numberOfPages;
}
+ @Override public void write(DataOutput output) throws IOException {
+ output.writeLong(dimensionOffset);
+ output.writeLong(measureOffsets);
+ int dsize = dimensionChunkOffsets != null ? dimensionChunkOffsets.size() : 0;
+ output.writeShort(dsize);
+ for (int i = 0; i < dsize; i++) {
+ output.writeLong(dimensionChunkOffsets.get(i));
+ }
+ for (int i = 0; i < dsize; i++) {
+ output.writeInt(dimensionChunksLength.get(i));
+ }
+ int mSize = measureChunkOffsets != null ? measureChunkOffsets.size() : 0;
+ output.writeShort(mSize);
+ for (int i = 0; i < mSize; i++) {
+ output.writeLong(measureChunkOffsets.get(i));
+ }
+ for (int i = 0; i < mSize; i++) {
+ output.writeInt(measureChunksLength.get(i));
+ }
+ }
+
+ @Override public void readFields(DataInput input) throws IOException {
+ dimensionOffset = input.readLong();
+ measureOffsets = input.readLong();
+ short dimensionChunkOffsetsSize = input.readShort();
+ dimensionChunkOffsets = new ArrayList<>(dimensionChunkOffsetsSize);
+ for (int i = 0; i < dimensionChunkOffsetsSize; i++) {
+ dimensionChunkOffsets.add(input.readLong());
+ }
+ dimensionChunksLength = new ArrayList<>(dimensionChunkOffsetsSize);
+ for (int i = 0; i < dimensionChunkOffsetsSize; i++) {
+ dimensionChunksLength.add(input.readInt());
+ }
+
+ short measureChunkOffsetsSize = input.readShort();
+ measureChunkOffsets = new ArrayList<>(measureChunkOffsetsSize);
+ for (int i = 0; i < measureChunkOffsetsSize; i++) {
+ measureChunkOffsets.add(input.readLong());
+ }
+ measureChunksLength = new ArrayList<>(measureChunkOffsetsSize);
+ for (int i = 0; i < measureChunkOffsetsSize; i++) {
+ measureChunksLength.add(input.readInt());
+ }
+
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/metadata/index/BlockIndexInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/index/BlockIndexInfo.java b/core/src/main/java/org/apache/carbondata/core/metadata/index/BlockIndexInfo.java
index cd86a07..ae99ed8 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/index/BlockIndexInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/index/BlockIndexInfo.java
@@ -16,6 +16,7 @@
*/
package org.apache.carbondata.core.metadata.index;
+import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
import org.apache.carbondata.core.metadata.blocklet.index.BlockletIndex;
/**
@@ -45,6 +46,11 @@ public class BlockIndexInfo {
private BlockletIndex blockletIndex;
/**
+ * to store blocklet info like offsets and lengths of each column.
+ */
+ private BlockletInfo blockletInfo;
+
+ /**
* Constructor
*
* @param numberOfRows number of rows
@@ -61,6 +67,20 @@ public class BlockIndexInfo {
}
/**
+ *
+ * @param numberOfRows
+ * @param fileName
+ * @param offset
+ * @param blockletIndex
+ * @param blockletInfo
+ */
+ public BlockIndexInfo(long numberOfRows, String fileName, long offset,
+ BlockletIndex blockletIndex, BlockletInfo blockletInfo) {
+ this(numberOfRows, fileName, offset, blockletIndex);
+ this.blockletInfo = blockletInfo;
+ }
+
+ /**
* @return the numberOfRows
*/
public long getNumberOfRows() {
@@ -87,4 +107,11 @@ public class BlockIndexInfo {
public BlockletIndex getBlockletIndex() {
return blockletIndex;
}
+
+ /**
+ * @return BlockletInfo
+ */
+ public BlockletInfo getBlockletInfo() {
+ return blockletInfo;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
index ff54673..e0ee5bb 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
@@ -21,8 +21,10 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
+import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@@ -41,6 +43,7 @@ import org.apache.carbondata.core.datastore.block.AbstractIndex;
import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.datastore.block.TableBlockInfo;
import org.apache.carbondata.core.datastore.block.TableBlockUniqueIdentifier;
+import org.apache.carbondata.core.indexstore.blockletindex.IndexWrapper;
import org.apache.carbondata.core.keygenerator.KeyGenException;
import org.apache.carbondata.core.keygenerator.KeyGenerator;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
@@ -116,23 +119,40 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
// so block will be loaded in sorted order this will be required for
// query execution
Collections.sort(queryModel.getTableBlockInfos());
- // get the table blocks
- CacheProvider cacheProvider = CacheProvider.getInstance();
- BlockIndexStore<TableBlockUniqueIdentifier, AbstractIndex> cache =
- (BlockIndexStore) cacheProvider
- .createCache(CacheType.EXECUTOR_BTREE, queryModel.getTable().getStorePath());
- // remove the invalid table blocks, block which is deleted or compacted
- cache.removeTableBlocks(queryModel.getInvalidSegmentIds(),
- queryModel.getAbsoluteTableIdentifier());
- List<TableBlockUniqueIdentifier> tableBlockUniqueIdentifiers =
- prepareTableBlockUniqueIdentifier(queryModel.getTableBlockInfos(),
- queryModel.getAbsoluteTableIdentifier());
- cache.removeTableBlocksIfHorizontalCompactionDone(queryModel);
- queryProperties.dataBlocks = cache.getAll(tableBlockUniqueIdentifiers);
- queryStatistic
- .addStatistics(QueryStatisticsConstants.LOAD_BLOCKS_EXECUTOR, System.currentTimeMillis());
- queryProperties.queryStatisticsRecorder.recordStatistics(queryStatistic);
+ if (queryModel.getTableBlockInfos().get(0).getDetailInfo() != null) {
+ List<AbstractIndex> indexList = new ArrayList<>();
+ Map<String, List<TableBlockInfo>> listMap = new LinkedHashMap<>();
+ for (TableBlockInfo blockInfo: queryModel.getTableBlockInfos()) {
+ List<TableBlockInfo> tableBlockInfos = listMap.get(blockInfo.getFilePath());
+ if (tableBlockInfos == null) {
+ tableBlockInfos = new ArrayList<>();
+ listMap.put(blockInfo.getFilePath(), tableBlockInfos);
+ }
+ tableBlockInfos.add(blockInfo);
+ }
+ for (List<TableBlockInfo> tableBlockInfos: listMap.values()) {
+ indexList.add(new IndexWrapper(tableBlockInfos));
+ }
+ queryProperties.dataBlocks = indexList;
+ } else {
+ // get the table blocks
+ CacheProvider cacheProvider = CacheProvider.getInstance();
+ BlockIndexStore<TableBlockUniqueIdentifier, AbstractIndex> cache =
+ (BlockIndexStore) cacheProvider
+ .createCache(CacheType.EXECUTOR_BTREE, queryModel.getTable().getStorePath());
+ // remove the invalid table blocks, block which is deleted or compacted
+ cache.removeTableBlocks(queryModel.getInvalidSegmentIds(),
+ queryModel.getAbsoluteTableIdentifier());
+ List<TableBlockUniqueIdentifier> tableBlockUniqueIdentifiers =
+ prepareTableBlockUniqueIdentifier(queryModel.getTableBlockInfos(),
+ queryModel.getAbsoluteTableIdentifier());
+ cache.removeTableBlocksIfHorizontalCompactionDone(queryModel);
+ queryProperties.dataBlocks = cache.getAll(tableBlockUniqueIdentifiers);
+ queryStatistic
+ .addStatistics(QueryStatisticsConstants.LOAD_BLOCKS_EXECUTOR, System.currentTimeMillis());
+ queryProperties.queryStatisticsRecorder.recordStatistics(queryStatistic);
+ }
// calculating the total number of aggeragted columns
int aggTypeCount = queryModel.getQueryMeasures().size();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
index 8704496..a874835 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
@@ -156,7 +156,7 @@ public class IncludeFilterExecuterImpl implements FilterExecuter {
int columnIndex = dimColumnEvaluatorInfo.getColumnIndex();
int blockIndex = segmentProperties.getDimensionOrdinalToBlockMapping().get(columnIndex);
- boolean isScanRequired =
+ boolean isScanRequired = blockIndex >= blkMaxVal.length ||
isScanRequired(blkMaxVal[blockIndex], blkMinVal[blockIndex], filterValues);
if (isScanRequired) {
bitSet.set(0);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RangeValueFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RangeValueFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RangeValueFilterExecuterImpl.java
index 6823531..c2e077e 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RangeValueFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RangeValueFilterExecuterImpl.java
@@ -287,7 +287,7 @@ public class RangeValueFilterExecuterImpl extends ValueBasedFilterExecuterImpl {
BitSet bitSet = new BitSet(1);
byte[][] filterValues = this.filterRangesValues;
int columnIndex = this.dimColEvaluatorInfo.getColumnIndex();
- boolean isScanRequired =
+ boolean isScanRequired = columnIndex >= blockMinValue.length ||
isScanRequired(blockMinValue[columnIndex], blockMaxValue[columnIndex], filterValues);
if (isScanRequired) {
bitSet.set(0);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
index be82be7..73352cb 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
@@ -79,7 +79,7 @@ public class RowLevelRangeGrtThanFiterExecuterImpl extends RowLevelFilterExecute
@Override public BitSet isScanRequired(byte[][] blockMaxValue, byte[][] blockMinValue) {
BitSet bitSet = new BitSet(1);
- boolean isScanRequired =
+ boolean isScanRequired = dimensionBlocksIndex[0] >= blockMaxValue.length ||
isScanRequired(blockMaxValue[dimensionBlocksIndex[0]], filterRangeValues);
if (isScanRequired) {
bitSet.set(0);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
index 53da6c5..6e8e188 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
@@ -81,7 +81,7 @@ public class RowLevelRangeGrtrThanEquaToFilterExecuterImpl extends RowLevelFilte
@Override public BitSet isScanRequired(byte[][] blockMaxValue, byte[][] blockMinValue) {
BitSet bitSet = new BitSet(1);
- boolean isScanRequired =
+ boolean isScanRequired = dimensionBlocksIndex[0] >= blockMaxValue.length ||
isScanRequired(blockMaxValue[dimensionBlocksIndex[0]], filterRangeValues);
if (isScanRequired) {
bitSet.set(0);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
index d694960..d6f7c86 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
@@ -81,7 +81,7 @@ public class RowLevelRangeLessThanEqualFilterExecuterImpl extends RowLevelFilter
@Override public BitSet isScanRequired(byte[][] blockMaxValue, byte[][] blockMinValue) {
BitSet bitSet = new BitSet(1);
- boolean isScanRequired =
+ boolean isScanRequired = dimensionBlocksIndex[0] >= blockMaxValue.length ||
isScanRequired(blockMinValue[dimensionBlocksIndex[0]], filterRangeValues);
if (isScanRequired) {
bitSet.set(0);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
index b3dd921..597ba52 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
@@ -82,7 +82,7 @@ public class RowLevelRangeLessThanFiterExecuterImpl extends RowLevelFilterExecut
@Override public BitSet isScanRequired(byte[][] blockMaxValue, byte[][] blockMinValue) {
BitSet bitSet = new BitSet(1);
- boolean isScanRequired =
+ boolean isScanRequired = dimensionBlocksIndex[0] >= blockMaxValue.length ||
isScanRequired(blockMinValue[dimensionBlocksIndex[0]], filterRangeValues);
if (isScanRequired) {
bitSet.set(0);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/scan/processor/AbstractDataBlockIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/processor/AbstractDataBlockIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/processor/AbstractDataBlockIterator.java
index fdb5483..ff4f5dd 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/processor/AbstractDataBlockIterator.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/processor/AbstractDataBlockIterator.java
@@ -165,6 +165,9 @@ public abstract class AbstractDataBlockIterator extends CarbonIterator<List<Obje
new BlocksChunkHolder(blockExecutionInfo.getTotalNumberDimensionBlock(),
blockExecutionInfo.getTotalNumberOfMeasureBlock(), fileReader);
blocksChunkHolder.setDataBlock(dataBlockIterator.next());
+ if (blocksChunkHolder.getDataBlock().getColumnsMaxValue() == null) {
+ return blocksChunkHolder;
+ }
if (blockletScanner.isScanRequired(blocksChunkHolder)) {
return blocksChunkHolder;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
index 92e9594..95030d3 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
@@ -32,6 +32,7 @@ import org.apache.carbondata.core.datastore.FileHolder;
import org.apache.carbondata.core.datastore.block.AbstractIndex;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.datastore.impl.btree.BTreeDataRefNodeFinder;
+import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataRefNodeWrapper;
import org.apache.carbondata.core.mutate.DeleteDeltaVo;
import org.apache.carbondata.core.reader.CarbonDeleteFilesDataReader;
import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
@@ -127,20 +128,27 @@ public abstract class AbstractDetailQueryResultIterator<E> extends CarbonIterato
// set the deleted row to block execution info
blockInfo.setDeletedRecordsMap(deletedRowsMap);
}
- DataRefNode startDataBlock = finder
- .findFirstDataBlock(blockInfo.getDataBlock().getDataRefNode(), blockInfo.getStartKey());
- while (startDataBlock.nodeNumber() < blockInfo.getStartBlockletIndex()) {
- startDataBlock = startDataBlock.getNextDataRefNode();
- }
- long numberOfBlockToScan = blockInfo.getNumberOfBlockletToScan();
- //if number of block is less than 0 then take end block.
- if (numberOfBlockToScan <= 0) {
- DataRefNode endDataBlock = finder
- .findLastDataBlock(blockInfo.getDataBlock().getDataRefNode(), blockInfo.getEndKey());
- numberOfBlockToScan = endDataBlock.nodeNumber() - startDataBlock.nodeNumber() + 1;
+ DataRefNode dataRefNode = blockInfo.getDataBlock().getDataRefNode();
+ if (dataRefNode instanceof BlockletDataRefNodeWrapper) {
+ BlockletDataRefNodeWrapper wrapper = (BlockletDataRefNodeWrapper) dataRefNode;
+ blockInfo.setFirstDataBlock(wrapper);
+ blockInfo.setNumberOfBlockToScan(wrapper.numberOfNodes());
+
+ } else {
+ DataRefNode startDataBlock =
+ finder.findFirstDataBlock(dataRefNode, blockInfo.getStartKey());
+ while (startDataBlock.nodeNumber() < blockInfo.getStartBlockletIndex()) {
+ startDataBlock = startDataBlock.getNextDataRefNode();
+ }
+ long numberOfBlockToScan = blockInfo.getNumberOfBlockletToScan();
+ //if number of block is less than 0 then take end block.
+ if (numberOfBlockToScan <= 0) {
+ DataRefNode endDataBlock = finder.findLastDataBlock(dataRefNode, blockInfo.getEndKey());
+ numberOfBlockToScan = endDataBlock.nodeNumber() - startDataBlock.nodeNumber() + 1;
+ }
+ blockInfo.setFirstDataBlock(startDataBlock);
+ blockInfo.setNumberOfBlockToScan(numberOfBlockToScan);
}
- blockInfo.setFirstDataBlock(startDataBlock);
- blockInfo.setNumberOfBlockToScan(numberOfBlockToScan);
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java b/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
index 97b1a1f..34c7709 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
@@ -122,6 +122,57 @@ public abstract class AbstractDataFileFooterConverter {
}
/**
+ * Below method will be used to get the index info from index file
+ *
+ * @param filePath file path of the index file
+ * @return list of index info
+ * @throws IOException problem while reading the index file
+ */
+ public List<DataFileFooter> getIndexInfo(String filePath) throws IOException {
+ CarbonIndexFileReader indexReader = new CarbonIndexFileReader();
+ List<DataFileFooter> dataFileFooters = new ArrayList<DataFileFooter>();
+ String parentPath = filePath.substring(0, filePath.lastIndexOf("/"));
+ try {
+ // open the reader
+ indexReader.openThriftReader(filePath);
+ // get the index header
+ org.apache.carbondata.format.IndexHeader readIndexHeader = indexReader.readIndexHeader();
+ List<ColumnSchema> columnSchemaList = new ArrayList<ColumnSchema>();
+ List<org.apache.carbondata.format.ColumnSchema> table_columns =
+ readIndexHeader.getTable_columns();
+ for (int i = 0; i < table_columns.size(); i++) {
+ columnSchemaList.add(thriftColumnSchmeaToWrapperColumnSchema(table_columns.get(i)));
+ }
+ // get the segment info
+ SegmentInfo segmentInfo = getSegmentInfo(readIndexHeader.getSegment_info());
+ BlockletIndex blockletIndex = null;
+ DataFileFooter dataFileFooter = null;
+ // read the block info from file
+ while (indexReader.hasNext()) {
+ BlockIndex readBlockIndexInfo = indexReader.readBlockIndexInfo();
+ blockletIndex = getBlockletIndex(readBlockIndexInfo.getBlock_index());
+ dataFileFooter = new DataFileFooter();
+ TableBlockInfo tableBlockInfo = new TableBlockInfo();
+ tableBlockInfo.setBlockOffset(readBlockIndexInfo.getOffset());
+ tableBlockInfo.setVersion(
+ ColumnarFormatVersion.valueOf((short) readIndexHeader.getVersion()));
+ int blockletSize = getBlockletSize(readBlockIndexInfo);
+ tableBlockInfo.getBlockletInfos().setNoOfBlockLets(blockletSize);
+ tableBlockInfo.setFilePath(parentPath + "/" + readBlockIndexInfo.file_name);
+ dataFileFooter.setBlockletIndex(blockletIndex);
+ dataFileFooter.setColumnInTable(columnSchemaList);
+ dataFileFooter.setNumberOfRows(readBlockIndexInfo.getNum_rows());
+ dataFileFooter.setBlockInfo(new BlockInfo(tableBlockInfo));
+ dataFileFooter.setSegmentInfo(segmentInfo);
+ dataFileFooters.add(dataFileFooter);
+ }
+ } finally {
+ indexReader.closeThriftReader();
+ }
+ return dataFileFooters;
+ }
+
+ /**
* the methods returns the number of blocklets in a block
*
* @param readBlockIndexInfo
@@ -148,6 +199,8 @@ public abstract class AbstractDataFileFooterConverter {
public abstract DataFileFooter readDataFileFooter(TableBlockInfo tableBlockInfo)
throws IOException;
+ public abstract List<ColumnSchema> getSchema(TableBlockInfo tableBlockInfo) throws IOException;
+
/**
* Below method will be used to get blocklet index for data file meta
*
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 200d5ca..51296d8 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -51,10 +51,13 @@ import org.apache.carbondata.core.datastore.columnar.UnBlockIndexer;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.datastore.page.statistics.MeasurePageStatsVO;
+import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
import org.apache.carbondata.core.keygenerator.mdkey.NumberCompressor;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
import org.apache.carbondata.core.metadata.ValueEncoderMeta;
import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.metadata.blocklet.SegmentInfo;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.encoder.Encoding;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
@@ -912,10 +915,26 @@ public final class CarbonUtil {
* Below method will be used to read the data file matadata
*/
public static DataFileFooter readMetadatFile(TableBlockInfo tableBlockInfo) throws IOException {
- AbstractDataFileFooterConverter fileFooterConverter =
- DataFileFooterConverterFactory.getInstance()
- .getDataFileFooterConverter(tableBlockInfo.getVersion());
- return fileFooterConverter.readDataFileFooter(tableBlockInfo);
+ BlockletDetailInfo detailInfo = tableBlockInfo.getDetailInfo();
+ if (detailInfo == null) {
+ AbstractDataFileFooterConverter fileFooterConverter =
+ DataFileFooterConverterFactory.getInstance()
+ .getDataFileFooterConverter(tableBlockInfo.getVersion());
+ return fileFooterConverter.readDataFileFooter(tableBlockInfo);
+ } else {
+ DataFileFooter fileFooter = new DataFileFooter();
+ fileFooter.setSchemaUpdatedTimeStamp(detailInfo.getSchemaUpdatedTimeStamp());
+ ColumnarFormatVersion version =
+ ColumnarFormatVersion.valueOf(detailInfo.getVersionNumber());
+ AbstractDataFileFooterConverter dataFileFooterConverter =
+ DataFileFooterConverterFactory.getInstance().getDataFileFooterConverter(version);
+ fileFooter.setColumnInTable(dataFileFooterConverter.getSchema(tableBlockInfo));
+ SegmentInfo segmentInfo = new SegmentInfo();
+ segmentInfo.setColumnCardinality(detailInfo.getDimLens());
+ segmentInfo.setNumberOfColumns(detailInfo.getRowCount());
+ fileFooter.setSegmentInfo(segmentInfo);
+ return fileFooter;
+ }
}
/**
@@ -1553,24 +1572,23 @@ public final class CarbonUtil {
}
/**
- * @param tableInfo
* @param invalidBlockVOForSegmentId
* @param updateStatusMngr
* @return
*/
- public static boolean isInvalidTableBlock(TableBlockInfo tableInfo,
+ public static boolean isInvalidTableBlock(String segmentId, String filePath,
UpdateVO invalidBlockVOForSegmentId, SegmentUpdateStatusManager updateStatusMngr) {
- if (!updateStatusMngr.isBlockValid(tableInfo.getSegmentId(),
- CarbonTablePath.getCarbonDataFileName(tableInfo.getFilePath()) + CarbonTablePath
+ if (!updateStatusMngr.isBlockValid(segmentId,
+ CarbonTablePath.getCarbonDataFileName(filePath) + CarbonTablePath
.getCarbonDataExtension())) {
return true;
}
if (null != invalidBlockVOForSegmentId) {
- Long blockTimeStamp = Long.parseLong(tableInfo.getFilePath()
- .substring(tableInfo.getFilePath().lastIndexOf('-') + 1,
- tableInfo.getFilePath().lastIndexOf('.')));
+ Long blockTimeStamp = Long.parseLong(filePath
+ .substring(filePath.lastIndexOf('-') + 1,
+ filePath.lastIndexOf('.')));
if ((blockTimeStamp > invalidBlockVOForSegmentId.getFactTimestamp() && (
invalidBlockVOForSegmentId.getUpdateDeltaStartTimestamp() != null
&& blockTimeStamp < invalidBlockVOForSegmentId.getUpdateDeltaStartTimestamp()))) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java
index 0f82b95..3ac6987 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java
@@ -121,4 +121,8 @@ public class DataFileFooterConverter extends AbstractDataFileFooterConverter {
blockletInfo.setNumberOfRows(blockletInfoThrift.getNum_rows());
return blockletInfo;
}
+
+ @Override public List<ColumnSchema> getSchema(TableBlockInfo tableBlockInfo) throws IOException {
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java
index 4882b0f..8cd437f 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java
@@ -140,4 +140,7 @@ public class DataFileFooterConverter2 extends AbstractDataFileFooterConverter {
return numberOfDimensionColumns;
}
+ @Override public List<ColumnSchema> getSchema(TableBlockInfo tableBlockInfo) throws IOException {
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java
index 143c1b1..ccb8b29 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java
@@ -85,6 +85,17 @@ public class DataFileFooterConverterV3 extends AbstractDataFileFooterConverter {
return dataFileFooter;
}
+ @Override public List<ColumnSchema> getSchema(TableBlockInfo tableBlockInfo) throws IOException {
+ CarbonHeaderReader carbonHeaderReader = new CarbonHeaderReader(tableBlockInfo.getFilePath());
+ FileHeader fileHeader = carbonHeaderReader.readHeader();
+ List<ColumnSchema> columnSchemaList = new ArrayList<ColumnSchema>();
+ List<org.apache.carbondata.format.ColumnSchema> table_columns = fileHeader.getColumn_schema();
+ for (int i = 0; i < table_columns.size(); i++) {
+ columnSchemaList.add(thriftColumnSchmeaToWrapperColumnSchema(table_columns.get(i)));
+ }
+ return columnSchemaList;
+ }
+
/**
* Below method is to convert the blocklet info of the thrift to wrapper
* blocklet info
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/format/src/main/thrift/carbondata_index.thrift
----------------------------------------------------------------------
diff --git a/format/src/main/thrift/carbondata_index.thrift b/format/src/main/thrift/carbondata_index.thrift
index c055031..4df085a 100644
--- a/format/src/main/thrift/carbondata_index.thrift
+++ b/format/src/main/thrift/carbondata_index.thrift
@@ -41,4 +41,5 @@ struct BlockIndex{
2: required string file_name; // Block file name
3: required i64 offset; // Offset of the footer
4: required carbondata.BlockletIndex block_index; // Blocklet index
+ 5: optional carbondata.BlockletInfo3 blocklet_info;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
index 1e69648..d03ae3a 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
@@ -19,7 +19,14 @@ package org.apache.carbondata.hadoop;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Constructor;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.DataRefNode;
@@ -367,8 +374,9 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
if (isIUDTable) {
// In case IUD is not performed in this table avoid searching for
// invalidated blocks.
- if (CarbonUtil.isInvalidTableBlock(tableBlockInfo, invalidBlockVOForSegmentId,
- updateStatusManager)) {
+ if (CarbonUtil
+ .isInvalidTableBlock(tableBlockInfo.getSegmentId(), tableBlockInfo.getFilePath(),
+ invalidBlockVOForSegmentId, updateStatusManager)) {
continue;
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
index 631bc2c..56bade7 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
@@ -29,6 +29,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.block.BlockletInfos;
import org.apache.carbondata.core.datastore.block.Distributable;
import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
import org.apache.carbondata.core.mutate.UpdateVO;
import org.apache.carbondata.core.util.ByteUtil;
@@ -77,6 +78,8 @@ public class CarbonInputSplit extends FileSplit
*/
private String[] deleteDeltaFiles;
+ private BlockletDetailInfo detailInfo;
+
public CarbonInputSplit() {
segmentId = null;
taskId = "0";
@@ -138,10 +141,12 @@ public class CarbonInputSplit extends FileSplit
BlockletInfos blockletInfos =
new BlockletInfos(split.getNumberOfBlocklets(), 0, split.getNumberOfBlocklets());
try {
- tableBlockInfoList.add(
+ TableBlockInfo blockInfo =
new TableBlockInfo(split.getPath().toString(), split.getStart(), split.getSegmentId(),
split.getLocations(), split.getLength(), blockletInfos, split.getVersion(),
- split.getDeleteDeltaFiles()));
+ split.getDeleteDeltaFiles());
+ blockInfo.setDetailInfo(split.getDetailInfo());
+ tableBlockInfoList.add(blockInfo);
} catch (IOException e) {
throw new RuntimeException("fail to get location of split: " + split, e);
}
@@ -153,9 +158,12 @@ public class CarbonInputSplit extends FileSplit
BlockletInfos blockletInfos =
new BlockletInfos(inputSplit.getNumberOfBlocklets(), 0, inputSplit.getNumberOfBlocklets());
try {
- return new TableBlockInfo(inputSplit.getPath().toString(), inputSplit.getStart(),
- inputSplit.getSegmentId(), inputSplit.getLocations(), inputSplit.getLength(),
- blockletInfos, inputSplit.getVersion(), inputSplit.getDeleteDeltaFiles());
+ TableBlockInfo blockInfo =
+ new TableBlockInfo(inputSplit.getPath().toString(), inputSplit.getStart(),
+ inputSplit.getSegmentId(), inputSplit.getLocations(), inputSplit.getLength(),
+ blockletInfos, inputSplit.getVersion(), inputSplit.getDeleteDeltaFiles());
+ blockInfo.setDetailInfo(inputSplit.getDetailInfo());
+ return blockInfo;
} catch (IOException e) {
throw new RuntimeException("fail to get location of split: " + inputSplit, e);
}
@@ -180,6 +188,11 @@ public class CarbonInputSplit extends FileSplit
for (int i = 0; i < numberOfDeleteDeltaFiles; i++) {
deleteDeltaFiles[i] = in.readUTF();
}
+ boolean detailInfoExists = in.readBoolean();
+ if (detailInfoExists) {
+ detailInfo = new BlockletDetailInfo();
+ detailInfo.readFields(in);
+ }
}
@Override public void write(DataOutput out) throws IOException {
@@ -197,6 +210,10 @@ public class CarbonInputSplit extends FileSplit
out.writeUTF(deleteDeltaFiles[i]);
}
}
+ out.writeBoolean(detailInfo != null);
+ if (detailInfo != null) {
+ detailInfo.write(out);
+ }
}
public List<String> getInvalidSegments() {
@@ -310,4 +327,16 @@ public class CarbonInputSplit extends FileSplit
public String[] getDeleteDeltaFiles() {
return deleteDeltaFiles;
}
+
+ public void setDeleteDeltaFiles(String[] deleteDeltaFiles) {
+ this.deleteDeltaFiles = deleteDeltaFiles;
+ }
+
+ public BlockletDetailInfo getDetailInfo() {
+ return detailInfo;
+ }
+
+ public void setDetailInfo(BlockletDetailInfo detailInfo) {
+ this.detailInfo = detailInfo;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index ae9c676..e73c04a 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -18,152 +18,556 @@
package org.apache.carbondata.hadoop.api;
import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.TableSegmentUniqueIdentifier;
+import org.apache.carbondata.core.indexstore.Blocklet;
+import org.apache.carbondata.core.indexstore.DataMapStoreManager;
+import org.apache.carbondata.core.indexstore.DataMapType;
+import org.apache.carbondata.core.indexstore.TableDataMap;
+import org.apache.carbondata.core.keygenerator.KeyGenException;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
+import org.apache.carbondata.core.metadata.schema.PartitionInfo;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.mutate.SegmentUpdateDetails;
+import org.apache.carbondata.core.mutate.UpdateVO;
+import org.apache.carbondata.core.mutate.data.BlockMappingVO;
import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.core.scan.model.CarbonQueryPlan;
+import org.apache.carbondata.core.scan.model.QueryModel;
+import org.apache.carbondata.core.scan.partition.PartitionUtil;
+import org.apache.carbondata.core.scan.partition.Partitioner;
+import org.apache.carbondata.core.stats.QueryStatistic;
+import org.apache.carbondata.core.stats.QueryStatisticsConstants;
+import org.apache.carbondata.core.stats.QueryStatisticsRecorder;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
+import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonStorePath;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
import org.apache.carbondata.hadoop.CarbonProjection;
-import org.apache.carbondata.hadoop.internal.CarbonInputSplit;
-import org.apache.carbondata.hadoop.internal.segment.Segment;
-import org.apache.carbondata.hadoop.internal.segment.SegmentManager;
-import org.apache.carbondata.hadoop.internal.segment.SegmentManagerFactory;
+import org.apache.carbondata.hadoop.CarbonRecordReader;
+import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
+import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodeReadSupport;
import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
import org.apache.carbondata.hadoop.util.ObjectSerializationUtil;
+import org.apache.carbondata.hadoop.util.SchemaReader;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.InvalidPathException;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.util.StringUtils;
/**
* Input format of CarbonData file.
+ *
* @param <T>
*/
public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
+ // comma separated list of input segment numbers
+ public static final String INPUT_SEGMENT_NUMBERS =
+ "mapreduce.input.carboninputformat.segmentnumbers";
+ // comma separated list of input files
+ public static final String INPUT_FILES = "mapreduce.input.carboninputformat.files";
+ private static final Log LOG = LogFactory.getLog(CarbonTableInputFormat.class);
private static final String FILTER_PREDICATE =
"mapreduce.input.carboninputformat.filter.predicate";
+ private static final String COLUMN_PROJECTION = "mapreduce.input.carboninputformat.projection";
+ private static final String CARBON_TABLE = "mapreduce.input.carboninputformat.table";
+ private static final String CARBON_READ_SUPPORT = "mapreduce.input.carboninputformat.readsupport";
- private SegmentManager segmentManager;
+ /**
+ * It is optional, if user does not set then it reads from store
+ *
+ * @param configuration
+ * @param carbonTable
+ * @throws IOException
+ */
+ public static void setCarbonTable(Configuration configuration, CarbonTable carbonTable)
+ throws IOException {
+ if (null != carbonTable) {
+ configuration.set(CARBON_TABLE, ObjectSerializationUtil.convertObjectToString(carbonTable));
+ }
+ }
- public CarbonTableInputFormat() {
- this.segmentManager = SegmentManagerFactory.getGlobalSegmentManager();
+ public static CarbonTable getCarbonTable(Configuration configuration) throws IOException {
+ String carbonTableStr = configuration.get(CARBON_TABLE);
+ if (carbonTableStr == null) {
+ populateCarbonTable(configuration);
+ // read it from schema file in the store
+ carbonTableStr = configuration.get(CARBON_TABLE);
+ return (CarbonTable) ObjectSerializationUtil.convertStringToObject(carbonTableStr);
+ }
+ return (CarbonTable) ObjectSerializationUtil.convertStringToObject(carbonTableStr);
}
- @Override
- public RecordReader<Void, T> createRecordReader(InputSplit split,
- TaskAttemptContext context) throws IOException, InterruptedException {
- switch (((CarbonInputSplit)split).formatType()) {
- case COLUMNAR:
- // TODO: create record reader for columnar format
- break;
- default:
- throw new RuntimeException("Unsupported format type");
+ /**
+ * this method will read the schema from the physical file and populate into CARBON_TABLE
+ *
+ * @param configuration
+ * @throws IOException
+ */
+ private static void populateCarbonTable(Configuration configuration) throws IOException {
+ String dirs = configuration.get(INPUT_DIR, "");
+ String[] inputPaths = StringUtils.split(dirs);
+ if (inputPaths.length == 0) {
+ throw new InvalidPathException("No input paths specified in job");
}
- return null;
+ AbsoluteTableIdentifier absoluteTableIdentifier =
+ AbsoluteTableIdentifier.fromTablePath(inputPaths[0]);
+ // read the schema file to get the absoluteTableIdentifier having the correct table id
+ // persisted in the schema
+ CarbonTable carbonTable = SchemaReader.readCarbonTableFromStore(absoluteTableIdentifier);
+ setCarbonTable(configuration, carbonTable);
}
- @Override
- public List<InputSplit> getSplits(JobContext job) throws IOException {
+ public static void setTablePath(Configuration configuration, String tablePath)
+ throws IOException {
+ configuration.set(FileInputFormat.INPUT_DIR, tablePath);
+ }
- // work as following steps:
- // get all current valid segment
- // for each segment, get all input split
+ /**
+ * It sets unresolved filter expression.
+ *
+ * @param configuration
+ * @param filterExpression
+ */
+ public static void setFilterPredicates(Configuration configuration, Expression filterExpression) {
+ if (filterExpression == null) {
+ return;
+ }
+ try {
+ String filterString = ObjectSerializationUtil.convertObjectToString(filterExpression);
+ configuration.set(FILTER_PREDICATE, filterString);
+ } catch (Exception e) {
+ throw new RuntimeException("Error while setting filter expression to Job", e);
+ }
+ }
- List<InputSplit> output = new LinkedList<>();
- Expression filter = getFilter(job.getConfiguration());
- Segment[] segments = segmentManager.getAllValidSegments();
- FilterResolverIntf filterResolver = CarbonInputFormatUtil.resolveFilter(filter, null);
- for (Segment segment: segments) {
- List<InputSplit> splits = segment.getSplits(job, filterResolver);
- output.addAll(splits);
+ public static void setColumnProjection(Configuration configuration, CarbonProjection projection) {
+ if (projection == null || projection.isEmpty()) {
+ return;
+ }
+ String[] allColumns = projection.getAllColumns();
+ StringBuilder builder = new StringBuilder();
+ for (String column : allColumns) {
+ builder.append(column).append(",");
}
- return output;
+ String columnString = builder.toString();
+ columnString = columnString.substring(0, columnString.length() - 1);
+ configuration.set(COLUMN_PROJECTION, columnString);
}
- /**
- * set the table path into configuration
- * @param conf configuration of the job
- * @param tablePath table path string
- */
- public void setTablePath(Configuration conf, String tablePath) {
+ public static String getColumnProjection(Configuration configuration) {
+ return configuration.get(COLUMN_PROJECTION);
+ }
+
+ public static void setCarbonReadSupport(Configuration configuration,
+ Class<? extends CarbonReadSupport> readSupportClass) {
+ if (readSupportClass != null) {
+ configuration.set(CARBON_READ_SUPPORT, readSupportClass.getName());
+ }
+ }
+ private static CarbonTablePath getTablePath(AbsoluteTableIdentifier absIdentifier) {
+ return CarbonStorePath.getCarbonTablePath(absIdentifier);
}
/**
- * return the table path in the configuration
- * @param conf configuration of the job
- * @return table path string
+ * Set list of segments to access
*/
- public String getTablePath(Configuration conf) {
- return null;
+ public static void setSegmentsToAccess(Configuration configuration, List<String> validSegments) {
+ configuration.set(INPUT_SEGMENT_NUMBERS, CarbonUtil.getSegmentString(validSegments));
}
/**
- * set projection columns into configuration
- * @param conf configuration of the job
- * @param projection projection
+ * Set list of files to access
*/
- public void setProjection(Configuration conf, CarbonProjection projection) {
+ public static void setFilesToAccess(Configuration configuration, List<String> validFiles) {
+ configuration.set(INPUT_FILES, CarbonUtil.getSegmentString(validFiles));
+ }
+ private static AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configuration)
+ throws IOException {
+ return getCarbonTable(configuration).getAbsoluteTableIdentifier();
}
/**
- * return the projection in the configuration
- * @param conf configuration of the job
- * @return projection
+ * {@inheritDoc}
+ * Configurations FileInputFormat.INPUT_DIR
+ * are used to get table path to read.
+ *
+ * @param job
+ * @return List<InputSplit> list of CarbonInputSplit
+ * @throws IOException
*/
- public CarbonProjection getProjection(Configuration conf) {
- return null;
+ @Override public List<InputSplit> getSplits(JobContext job) throws IOException {
+ AbsoluteTableIdentifier identifier = getAbsoluteTableIdentifier(job.getConfiguration());
+ TableDataMap blockletMap =
+ DataMapStoreManager.getInstance().getDataMap(identifier, "blocklet", DataMapType.BLOCKLET);
+ List<String> invalidSegments = new ArrayList<>();
+ List<UpdateVO> invalidTimestampsList = new ArrayList<>();
+ List<String> validSegments = Arrays.asList(getSegmentsToAccess(job));
+ // get all valid segments and set them into the configuration
+ if (validSegments.size() == 0) {
+ SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier);
+ SegmentStatusManager.ValidAndInvalidSegmentsInfo segments =
+ segmentStatusManager.getValidAndInvalidSegments();
+ SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(identifier);
+ validSegments = segments.getValidSegments();
+ if (validSegments.size() == 0) {
+ return new ArrayList<>(0);
+ }
+
+ // remove entry in the segment index if there are invalid segments
+ invalidSegments.addAll(segments.getInvalidSegments());
+ for (String invalidSegmentId : invalidSegments) {
+ invalidTimestampsList.add(updateStatusManager.getInvalidTimestampRange(invalidSegmentId));
+ }
+ if (invalidSegments.size() > 0) {
+ List<TableSegmentUniqueIdentifier> invalidSegmentsIds =
+ new ArrayList<>(invalidSegments.size());
+ for (String segId : invalidSegments) {
+ invalidSegmentsIds.add(new TableSegmentUniqueIdentifier(identifier, segId));
+ }
+ blockletMap.clear(invalidSegments);
+ }
+ }
+
+ // process and resolve the expression
+ Expression filter = getFilterPredicates(job.getConfiguration());
+ CarbonTable carbonTable = getCarbonTable(job.getConfiguration());
+ // this will be null in case of corrupt schema file.
+ if (null == carbonTable) {
+ throw new IOException("Missing/Corrupt schema file for table.");
+ }
+
+ CarbonInputFormatUtil.processFilterExpression(filter, carbonTable);
+
+ // prune partitions for filter query on partition table
+ BitSet matchedPartitions = null;
+ if (null != filter) {
+ PartitionInfo partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName());
+ if (null != partitionInfo) {
+ Partitioner partitioner = PartitionUtil.getPartitioner(partitionInfo);
+ matchedPartitions = new FilterExpressionProcessor()
+ .getFilteredPartitions(filter, partitionInfo, partitioner);
+ if (matchedPartitions.cardinality() == 0) {
+ // no partition is required
+ return new ArrayList<InputSplit>();
+ }
+ if (matchedPartitions.cardinality() == partitioner.numPartitions()) {
+ // all partitions are required, no need to prune partitions
+ matchedPartitions = null;
+ }
+ }
+ }
+
+ FilterResolverIntf filterInterface = CarbonInputFormatUtil.resolveFilter(filter, identifier);
+
+ // do block filtering and get split
+ List<InputSplit> splits = getSplits(job, filterInterface, validSegments, matchedPartitions);
+ // pass the invalid segment to task side in order to remove index entry in task side
+ if (invalidSegments.size() > 0) {
+ for (InputSplit split : splits) {
+ ((org.apache.carbondata.hadoop.CarbonInputSplit) split).setInvalidSegments(invalidSegments);
+ ((org.apache.carbondata.hadoop.CarbonInputSplit) split)
+ .setInvalidTimestampRange(invalidTimestampsList);
+ }
+ }
+ return splits;
}
/**
- * set filter expression into the configuration
- * @param conf configuration of the job
- * @param filter filter expression
+ * {@inheritDoc}
+ * Configurations FileInputFormat.INPUT_DIR, CarbonInputFormat.INPUT_SEGMENT_NUMBERS
+ * are used to get table path to read.
+ *
+ * @return
+ * @throws IOException
*/
- public void setFilter(Configuration conf, Expression filter) {
+ private List<InputSplit> getSplits(JobContext job, FilterResolverIntf filterResolver,
+ List<String> validSegments, BitSet matchedPartitions) throws IOException {
+
+ List<InputSplit> result = new LinkedList<InputSplit>();
+ UpdateVO invalidBlockVOForSegmentId = null;
+ Boolean isIUDTable = false;
+
+ AbsoluteTableIdentifier absoluteTableIdentifier =
+ getCarbonTable(job.getConfiguration()).getAbsoluteTableIdentifier();
+ SegmentUpdateStatusManager updateStatusManager =
+ new SegmentUpdateStatusManager(absoluteTableIdentifier);
+
+ isIUDTable = (updateStatusManager.getUpdateStatusDetails().length != 0);
+
+ //for each segment fetch blocks matching filter in Driver BTree
+ List<org.apache.carbondata.hadoop.CarbonInputSplit> dataBlocksOfSegment =
+ getDataBlocksOfSegment(job, absoluteTableIdentifier, filterResolver, matchedPartitions,
+ validSegments);
+ for (org.apache.carbondata.hadoop.CarbonInputSplit inputSplit : dataBlocksOfSegment) {
+
+ // Get the UpdateVO for those tables on which IUD operations being performed.
+ if (isIUDTable) {
+ invalidBlockVOForSegmentId =
+ updateStatusManager.getInvalidTimestampRange(inputSplit.getSegmentId());
+ }
+ if (isIUDTable) {
+ // In case IUD is not performed in this table avoid searching for
+ // invalidated blocks.
+ if (CarbonUtil
+ .isInvalidTableBlock(inputSplit.getSegmentId(), inputSplit.getPath().toString(),
+ invalidBlockVOForSegmentId, updateStatusManager)) {
+ continue;
+ }
+ }
+ String[] deleteDeltaFilePath = null;
+ try {
+ deleteDeltaFilePath =
+ updateStatusManager.getDeleteDeltaFilePath(inputSplit.getPath().toString());
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ inputSplit.setDeleteDeltaFiles(deleteDeltaFilePath);
+ result.add(inputSplit);
+ }
+ return result;
+ }
+
+ protected Expression getFilterPredicates(Configuration configuration) {
try {
- String filterString = ObjectSerializationUtil.convertObjectToString(filter);
- conf.set(FILTER_PREDICATE, filterString);
- } catch (Exception e) {
- throw new RuntimeException("Error while setting filter expression to Job", e);
+ String filterExprString = configuration.get(FILTER_PREDICATE);
+ if (filterExprString == null) {
+ return null;
+ }
+ Object filter = ObjectSerializationUtil.convertStringToObject(filterExprString);
+ return (Expression) filter;
+ } catch (IOException e) {
+ throw new RuntimeException("Error while reading filter expression", e);
}
}
/**
- * return filter expression in the configuration
- * @param conf configuration of the job
- * @return filter expression
+ * get data blocks of given segment
*/
- public Expression getFilter(Configuration conf) {
- Object filter;
- String filterExprString = conf.get(FILTER_PREDICATE);
- if (filterExprString == null) {
- return null;
+ private List<org.apache.carbondata.hadoop.CarbonInputSplit> getDataBlocksOfSegment(JobContext job,
+ AbsoluteTableIdentifier absoluteTableIdentifier, FilterResolverIntf resolver,
+ BitSet matchedPartitions, List<String> segmentIds) throws IOException {
+
+ QueryStatisticsRecorder recorder = CarbonTimeStatisticsFactory.createDriverRecorder();
+ QueryStatistic statistic = new QueryStatistic();
+
+ // get tokens for all the required FileSystem for table path
+ TokenCache.obtainTokensForNamenodes(job.getCredentials(),
+ new Path[] { new Path(absoluteTableIdentifier.getTablePath()) }, job.getConfiguration());
+
+ TableDataMap blockletMap = DataMapStoreManager.getInstance()
+ .getDataMap(absoluteTableIdentifier, "blocklet", DataMapType.BLOCKLET);
+ List<Blocklet> prunedBlocklets = blockletMap.prune(segmentIds, resolver);
+
+ List<org.apache.carbondata.hadoop.CarbonInputSplit> resultFilterredBlocks = new ArrayList<>();
+ for (Blocklet blocklet : prunedBlocklets) {
+ int taskId = CarbonTablePath.DataFileUtil.getTaskIdFromTaskNo(
+ CarbonTablePath.DataFileUtil.getTaskNo(blocklet.getPath().toString()));
+
+ // matchedPartitions variable will be null in two cases as follows
+ // 1. the table is not a partition table
+ // 2. the table is a partition table, and all partitions are matched by query
+ // for partition table, the task id of carbaondata file name is the partition id.
+ // if this partition is not required, here will skip it.
+ if (matchedPartitions == null || matchedPartitions.get(taskId)) {
+ resultFilterredBlocks.add(convertToCarbonInputSplit(blocklet));
+ }
}
+ statistic
+ .addStatistics(QueryStatisticsConstants.LOAD_BLOCKS_DRIVER, System.currentTimeMillis());
+ recorder.recordStatisticsForDriver(statistic, job.getConfiguration().get("query.id"));
+ return resultFilterredBlocks;
+ }
+
+ private org.apache.carbondata.hadoop.CarbonInputSplit convertToCarbonInputSplit(Blocklet blocklet)
+ throws IOException {
+ blocklet.updateLocations();
+ org.apache.carbondata.hadoop.CarbonInputSplit split =
+ org.apache.carbondata.hadoop.CarbonInputSplit.from(blocklet.getSegmentId(),
+ new FileSplit(blocklet.getPath(), 0, blocklet.getLength(), blocklet.getLocations()),
+ ColumnarFormatVersion.valueOf((short) blocklet.getDetailInfo().getVersionNumber()));
+ split.setDetailInfo(blocklet.getDetailInfo());
+ return split;
+ }
+
+ @Override public RecordReader<Void, T> createRecordReader(InputSplit inputSplit,
+ TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
+ Configuration configuration = taskAttemptContext.getConfiguration();
+ QueryModel queryModel = getQueryModel(inputSplit, taskAttemptContext);
+ CarbonReadSupport<T> readSupport = getReadSupportClass(configuration);
+ return new CarbonRecordReader<T>(queryModel, readSupport);
+ }
+
+ public QueryModel getQueryModel(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
+ throws IOException {
+ Configuration configuration = taskAttemptContext.getConfiguration();
+ CarbonTable carbonTable = getCarbonTable(configuration);
+ // getting the table absoluteTableIdentifier from the carbonTable
+ // to avoid unnecessary deserialization
+ AbsoluteTableIdentifier identifier = carbonTable.getAbsoluteTableIdentifier();
+
+ // query plan includes projection column
+ String projection = getColumnProjection(configuration);
+ CarbonQueryPlan queryPlan = CarbonInputFormatUtil.createQueryPlan(carbonTable, projection);
+ QueryModel queryModel = QueryModel.createModel(identifier, queryPlan, carbonTable);
+
+ // set the filter to the query model in order to filter blocklet before scan
+ Expression filter = getFilterPredicates(configuration);
+ CarbonInputFormatUtil.processFilterExpression(filter, carbonTable);
+ FilterResolverIntf filterIntf = CarbonInputFormatUtil.resolveFilter(filter, identifier);
+ queryModel.setFilterExpressionResolverTree(filterIntf);
+
+ // update the file level index store if there are invalid segment
+ if (inputSplit instanceof CarbonMultiBlockSplit) {
+ CarbonMultiBlockSplit split = (CarbonMultiBlockSplit) inputSplit;
+ List<String> invalidSegments = split.getAllSplits().get(0).getInvalidSegments();
+ if (invalidSegments.size() > 0) {
+ queryModel.setInvalidSegmentIds(invalidSegments);
+ }
+ List<UpdateVO> invalidTimestampRangeList =
+ split.getAllSplits().get(0).getInvalidTimestampRange();
+ if ((null != invalidTimestampRangeList) && (invalidTimestampRangeList.size() > 0)) {
+ queryModel.setInvalidBlockForSegmentId(invalidTimestampRangeList);
+ }
+ }
+ return queryModel;
+ }
+
+ public CarbonReadSupport<T> getReadSupportClass(Configuration configuration) {
+ String readSupportClass = configuration.get(CARBON_READ_SUPPORT);
+ //By default it uses dictionary decoder read class
+ CarbonReadSupport<T> readSupport = null;
+ if (readSupportClass != null) {
+ try {
+ Class<?> myClass = Class.forName(readSupportClass);
+ Constructor<?> constructor = myClass.getConstructors()[0];
+ Object object = constructor.newInstance();
+ if (object instanceof CarbonReadSupport) {
+ readSupport = (CarbonReadSupport) object;
+ }
+ } catch (ClassNotFoundException ex) {
+ LOG.error("Class " + readSupportClass + "not found", ex);
+ } catch (Exception ex) {
+ LOG.error("Error while creating " + readSupportClass, ex);
+ }
+ } else {
+ readSupport = new DictionaryDecodeReadSupport<>();
+ }
+ return readSupport;
+ }
+
+ @Override protected boolean isSplitable(JobContext context, Path filename) {
try {
- filter = ObjectSerializationUtil.convertStringToObject(filterExprString);
- } catch (IOException e) {
- throw new RuntimeException("Error while reading filter expression", e);
+ // Don't split the file if it is local file system
+ FileSystem fileSystem = filename.getFileSystem(context.getConfiguration());
+ if (fileSystem instanceof LocalFileSystem) {
+ return false;
+ }
+ } catch (Exception e) {
+ return true;
+ }
+ return true;
+ }
+
+ /**
+ * required to be moved to core
+ *
+ * @return updateExtension
+ */
+ private String getUpdateExtension() {
+ // TODO: required to modify when supporting update, mostly will be update timestamp
+ return "update";
+ }
+
+ /**
+ * return valid segment to access
+ */
+ private String[] getSegmentsToAccess(JobContext job) {
+ String segmentString = job.getConfiguration().get(INPUT_SEGMENT_NUMBERS, "");
+ if (segmentString.trim().isEmpty()) {
+ return new String[0];
}
- assert (filter instanceof Expression);
- return (Expression) filter;
+ return segmentString.split(",");
}
/**
- * Optional API. It can be used by query optimizer to select index based on filter
- * in the configuration of the job. After selecting index internally, index' name will be set
- * in the configuration.
+ * Get the row count of the Block and mapping of segment and Block count.
*
- * The process of selection is simple, just use the default index. Subclass can provide a more
- * advanced selection logic like cost based.
- * @param conf job configuration
+ * @param job
+ * @param identifier
+ * @return
+ * @throws IOException
+ * @throws KeyGenException
*/
- public void selectIndex(Configuration conf) {
- // set the default index in configuration
+ public BlockMappingVO getBlockRowCount(JobContext job, AbsoluteTableIdentifier identifier)
+ throws IOException, KeyGenException {
+ TableDataMap blockletMap =
+ DataMapStoreManager.getInstance().getDataMap(identifier, "blocklet", DataMapType.BLOCKLET);
+ SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(identifier);
+ SegmentStatusManager.ValidAndInvalidSegmentsInfo validAndInvalidSegments =
+ new SegmentStatusManager(identifier).getValidAndInvalidSegments();
+ Map<String, Long> blockRowCountMapping =
+ new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+ Map<String, Long> segmentAndBlockCountMapping =
+ new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+ List<Blocklet> blocklets = blockletMap.prune(validAndInvalidSegments.getValidSegments(), null);
+ for (Blocklet blocklet : blocklets) {
+ String blockName = blocklet.getPath().toString();
+ blockName = CarbonTablePath.getCarbonDataFileName(blockName);
+ blockName = blockName + CarbonTablePath.getCarbonDataExtension();
+
+ long rowCount = blocklet.getDetailInfo().getRowCount();
+
+ String key = CarbonUpdateUtil.getSegmentBlockNameKey(blocklet.getSegmentId(), blockName);
+
+ // if block is invalid then dont add the count
+ SegmentUpdateDetails details = updateStatusManager.getDetailsForABlock(key);
+
+ if (null == details || !CarbonUpdateUtil.isBlockInvalid(details.getStatus())) {
+ Long blockCount = blockRowCountMapping.get(key);
+ if (blockCount == null) {
+ blockCount = 0L;
+ Long count = segmentAndBlockCountMapping.get(blocklet.getSegmentId());
+ if (count == null) {
+ count = 0L;
+ }
+ segmentAndBlockCountMapping.put(blocklet.getSegmentId(), count + 1);
+ }
+ blockCount += rowCount;
+ blockRowCountMapping.put(key, blockCount);
+ }
+ }
+ return new BlockMappingVO(blockRowCountMapping, segmentAndBlockCountMapping);
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
index 8270304..8269757 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
@@ -34,7 +34,7 @@ import org.apache.carbondata.core.scan.model.CarbonQueryPlan;
import org.apache.carbondata.core.scan.model.QueryDimension;
import org.apache.carbondata.core.scan.model.QueryMeasure;
import org.apache.carbondata.core.scan.model.QueryModel;
-import org.apache.carbondata.hadoop.CarbonInputFormat;
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
@@ -77,9 +77,10 @@ public class CarbonInputFormatUtil {
return plan;
}
- public static <V> CarbonInputFormat<V> createCarbonInputFormat(AbsoluteTableIdentifier identifier,
+ public static <V> CarbonTableInputFormat<V> createCarbonInputFormat(
+ AbsoluteTableIdentifier identifier,
Job job) throws IOException {
- CarbonInputFormat<V> carbonInputFormat = new CarbonInputFormat<>();
+ CarbonTableInputFormat<V> carbonInputFormat = new CarbonTableInputFormat<>();
FileInputFormat.addInputPath(job, new Path(identifier.getTablePath()));
return carbonInputFormat;
}
[3/3] carbondata git commit: [CARBONDATA-1232] Datamap implementation
for Blocklet
Posted by ja...@apache.org.
[CARBONDATA-1232] Datamap implementation for Blocklet
This closes #1099
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/6d71d9c4
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/6d71d9c4
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/6d71d9c4
Branch: refs/heads/datamap
Commit: 6d71d9c474e50792fc0fba3a321c2de927b05c84
Parents: 3ecb3ec
Author: ravipesala <ra...@gmail.com>
Authored: Sat Jun 17 22:53:57 2017 +0530
Committer: jackylk <ja...@huawei.com>
Committed: Tue Jul 11 13:17:43 2017 +0800
----------------------------------------------------------------------
.../carbondata/core/cache/CacheProvider.java | 3 +
.../apache/carbondata/core/cache/CacheType.java | 6 +
.../core/datastore/block/TableBlockInfo.java | 19 +
.../core/datastore/block/TaskBlockInfo.java | 4 +
.../carbondata/core/indexstore/Blocklet.java | 55 +-
.../indexstore/BlockletDataMapIndexStore.java | 180 ++++++
.../core/indexstore/BlockletDetailInfo.java | 117 ++++
.../carbondata/core/indexstore/DataMap.java | 8 +-
.../core/indexstore/DataMapFactory.java | 87 +++
.../core/indexstore/DataMapStoreManager.java | 90 ++-
.../carbondata/core/indexstore/DataMapType.java | 14 +-
.../TableBlockIndexUniqueIdentifier.java | 103 ++++
.../core/indexstore/TableDataMap.java | 97 +++-
.../core/indexstore/UnsafeMemoryDMStore.java | 207 +++++++
.../blockletindex/BlockletDMComparator.java | 134 +++++
.../blockletindex/BlockletDataMap.java | 445 +++++++++++++++
.../blockletindex/BlockletDataMapFactory.java | 115 ++++
.../BlockletDataRefNodeWrapper.java | 137 +++++
.../indexstore/blockletindex/IndexWrapper.java | 49 ++
.../core/indexstore/row/DataMapRow.java | 89 +++
.../core/indexstore/row/DataMapRowImpl.java | 106 ++++
.../core/indexstore/row/UnsafeDataMapRow.java | 133 +++++
.../core/indexstore/schema/DataMapSchema.java | 124 ++++
.../core/indexstore/schema/FilterType.java | 24 +
.../core/metadata/blocklet/BlockletInfo.java | 53 +-
.../core/metadata/index/BlockIndexInfo.java | 27 +
.../executor/impl/AbstractQueryExecutor.java | 52 +-
.../executer/IncludeFilterExecuterImpl.java | 2 +-
.../executer/RangeValueFilterExecuterImpl.java | 2 +-
.../RowLevelRangeGrtThanFiterExecuterImpl.java | 2 +-
...elRangeGrtrThanEquaToFilterExecuterImpl.java | 2 +-
...velRangeLessThanEqualFilterExecuterImpl.java | 2 +-
.../RowLevelRangeLessThanFiterExecuterImpl.java | 2 +-
.../processor/AbstractDataBlockIterator.java | 3 +
.../AbstractDetailQueryResultIterator.java | 34 +-
.../util/AbstractDataFileFooterConverter.java | 53 ++
.../apache/carbondata/core/util/CarbonUtil.java | 40 +-
.../core/util/DataFileFooterConverter.java | 4 +
.../core/util/DataFileFooterConverter2.java | 3 +
.../core/util/DataFileFooterConverterV3.java | 11 +
format/src/main/thrift/carbondata_index.thrift | 1 +
.../carbondata/hadoop/CarbonInputFormat.java | 14 +-
.../carbondata/hadoop/CarbonInputSplit.java | 39 +-
.../hadoop/api/CarbonTableInputFormat.java | 562 ++++++++++++++++---
.../hadoop/util/CarbonInputFormatUtil.java | 7 +-
.../presto/impl/CarbonTableReader.java | 56 +-
.../spark/rdd/CarbonIUDMergerRDD.scala | 5 +-
.../carbondata/spark/rdd/CarbonMergerRDD.scala | 9 +-
.../carbondata/spark/rdd/CarbonScanRDD.scala | 20 +-
.../carbondata/spark/util/QueryPlanUtil.scala | 10 +-
.../sql/CarbonDatasourceHadoopRelation.scala | 13 +-
.../sql/execution/command/IUDCommands.scala | 7 -
.../carbondata/spark/util/QueryPlanUtil.scala | 10 +-
.../apache/spark/sql/hive/CarbonMetastore.scala | 10 +-
.../processing/merger/CarbonCompactionUtil.java | 32 ++
55 files changed, 3172 insertions(+), 261 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/cache/CacheProvider.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/cache/CacheProvider.java b/core/src/main/java/org/apache/carbondata/core/cache/CacheProvider.java
index 25a8976..5c4b265 100644
--- a/core/src/main/java/org/apache/carbondata/core/cache/CacheProvider.java
+++ b/core/src/main/java/org/apache/carbondata/core/cache/CacheProvider.java
@@ -31,6 +31,7 @@ import org.apache.carbondata.core.datastore.BlockIndexStore;
import org.apache.carbondata.core.datastore.SegmentTaskIndexStore;
import org.apache.carbondata.core.datastore.block.AbstractIndex;
import org.apache.carbondata.core.datastore.block.TableBlockUniqueIdentifier;
+import org.apache.carbondata.core.indexstore.BlockletDataMapIndexStore;
import org.apache.carbondata.core.util.CarbonProperties;
/**
@@ -126,6 +127,8 @@ public class CacheProvider {
} else if (cacheType.equals(cacheType.DRIVER_BTREE)) {
cacheObject =
new SegmentTaskIndexStore(carbonStorePath, carbonLRUCache);
+ } else if (cacheType.equals(cacheType.DRIVER_BLOCKLET_DATAMAP)) {
+ cacheObject = new BlockletDataMapIndexStore(carbonStorePath, carbonLRUCache);
}
cacheTypeToCacheMap.put(cacheType, cacheObject);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/cache/CacheType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/cache/CacheType.java b/core/src/main/java/org/apache/carbondata/core/cache/CacheType.java
index 2d6570d..ab51ff2 100644
--- a/core/src/main/java/org/apache/carbondata/core/cache/CacheType.java
+++ b/core/src/main/java/org/apache/carbondata/core/cache/CacheType.java
@@ -56,6 +56,12 @@ public class CacheType<K, V> {
DRIVER_BTREE = new CacheType("driver_btree");
/**
+ * Executor BTree cache which maintains size of BTree metadata
+ */
+ public static final CacheType<TableSegmentUniqueIdentifier, SegmentTaskIndexWrapper>
+ DRIVER_BLOCKLET_DATAMAP = new CacheType("driver_blocklet_datamap");
+
+ /**
* cacheName which is unique name for a cache
*/
private String cacheName;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
index 44347cf..f003882 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
@@ -22,6 +22,7 @@ import java.util.Map;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
import org.apache.carbondata.core.util.ByteUtil;
import org.apache.carbondata.core.util.path.CarbonTablePath;
@@ -77,6 +78,8 @@ public class TableBlockInfo implements Distributable, Serializable {
*/
private String[] deletedDeltaFilePath;
+ private BlockletDetailInfo detailInfo;
+
public TableBlockInfo(String filePath, long blockOffset, String segmentId, String[] locations,
long blockLength, ColumnarFormatVersion version, String[] deletedDeltaFilePath) {
this.filePath = FileFactory.getUpdatedFilePath(filePath);
@@ -88,6 +91,10 @@ public class TableBlockInfo implements Distributable, Serializable {
this.deletedDeltaFilePath = deletedDeltaFilePath;
}
+ public TableBlockInfo() {
+
+ }
+
/**
* constructor to initialize the TbaleBlockInfo with BlockletInfos
*
@@ -319,4 +326,16 @@ public class TableBlockInfo implements Distributable, Serializable {
public String[] getDeletedDeltaFilePath() {
return deletedDeltaFilePath;
}
+
+ public void setFilePath(String filePath) {
+ this.filePath = filePath;
+ }
+
+ public BlockletDetailInfo getDetailInfo() {
+ return detailInfo;
+ }
+
+ public void setDetailInfo(BlockletDetailInfo detailInfo) {
+ this.detailInfo = detailInfo;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/datastore/block/TaskBlockInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/block/TaskBlockInfo.java b/core/src/main/java/org/apache/carbondata/core/datastore/block/TaskBlockInfo.java
index eb707c2..4fcec87 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/block/TaskBlockInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/block/TaskBlockInfo.java
@@ -17,6 +17,7 @@
package org.apache.carbondata.core.datastore.block;
+import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -45,6 +46,9 @@ public class TaskBlockInfo {
return taskBlockInfoMapping.keySet();
}
+ public Collection<List<TableBlockInfo>> getAllTableBlockInfoList() {
+ return taskBlockInfoMapping.values();
+ }
/**
* returns TableBlockInfoList of given task
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java b/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
index 597c46c..66da4d0 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
@@ -16,27 +16,76 @@
*/
package org.apache.carbondata.core.indexstore;
+import java.io.IOException;
import java.io.Serializable;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+
/**
* Blocklet
*/
public class Blocklet implements Serializable {
- private String path;
+ private Path path;
+
+ private String segmentId;
private String blockletId;
+ private BlockletDetailInfo detailInfo;
+
+ private long length;
+
+ private String[] location;
+
public Blocklet(String path, String blockletId) {
- this.path = path;
+ this.path = new Path(path);
this.blockletId = blockletId;
}
- public String getPath() {
+ public Path getPath() {
return path;
}
public String getBlockletId() {
return blockletId;
}
+
+ public BlockletDetailInfo getDetailInfo() {
+ return detailInfo;
+ }
+
+ public void setDetailInfo(BlockletDetailInfo detailInfo) {
+ this.detailInfo = detailInfo;
+ }
+
+ public void updateLocations() throws IOException {
+ FileSystem fs = path.getFileSystem(FileFactory.getConfiguration());
+ RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path);
+ LocatedFileStatus fileStatus = iter.next();
+ location = fileStatus.getBlockLocations()[0].getHosts();
+ length = fileStatus.getLen();
+ }
+
+ public String[] getLocations() throws IOException {
+ return location;
+ }
+
+ public long getLength() throws IOException {
+ return length;
+ }
+
+ public String getSegmentId() {
+ return segmentId;
+ }
+
+ public void setSegmentId(String segmentId) {
+ this.segmentId = segmentId;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
new file mode 100644
index 0000000..fc8c273
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.cache.Cache;
+import org.apache.carbondata.core.cache.CarbonLRUCache;
+import org.apache.carbondata.core.datastore.exception.IndexBuilderException;
+import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMap;
+
+/**
+ * Class to handle loading, unloading,clearing,storing of the table
+ * blocks
+ */
+public class BlockletDataMapIndexStore
+ implements Cache<TableBlockIndexUniqueIdentifier, BlockletDataMap> {
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(BlockletDataMapIndexStore.class.getName());
+ /**
+ * carbon store path
+ */
+ protected String carbonStorePath;
+ /**
+ * CarbonLRU cache
+ */
+ protected CarbonLRUCache lruCache;
+
+ /**
+ * map of block info to lock object map, while loading the btree this will be filled
+ * and removed after loading the tree for that particular block info, this will be useful
+ * while loading the tree concurrently so only block level lock will be applied another
+ * block can be loaded concurrently
+ */
+ private Map<String, Object> segmentLockMap;
+
+ /**
+ * constructor to initialize the SegmentTaskIndexStore
+ *
+ * @param carbonStorePath
+ * @param lruCache
+ */
+ public BlockletDataMapIndexStore(String carbonStorePath, CarbonLRUCache lruCache) {
+ this.carbonStorePath = carbonStorePath;
+ this.lruCache = lruCache;
+ segmentLockMap = new ConcurrentHashMap<String, Object>();
+ }
+
+ @Override public BlockletDataMap get(TableBlockIndexUniqueIdentifier tableSegmentUniqueIdentifier)
+ throws IOException {
+ String lruCacheKey = tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier();
+ BlockletDataMap dataMap = (BlockletDataMap) lruCache.get(lruCacheKey);
+ if (dataMap == null) {
+ try {
+ dataMap = loadAndGetDataMap(tableSegmentUniqueIdentifier);
+ } catch (IndexBuilderException e) {
+ throw new IOException(e.getMessage(), e);
+ } catch (Throwable e) {
+ throw new IOException("Problem in loading segment block.", e);
+ }
+ }
+ return dataMap;
+ }
+
+ @Override public List<BlockletDataMap> getAll(
+ List<TableBlockIndexUniqueIdentifier> tableSegmentUniqueIdentifiers) throws IOException {
+ List<BlockletDataMap> blockletDataMaps = new ArrayList<>(tableSegmentUniqueIdentifiers.size());
+ try {
+ for (TableBlockIndexUniqueIdentifier identifier : tableSegmentUniqueIdentifiers) {
+ blockletDataMaps.add(get(identifier));
+ }
+ } catch (Throwable e) {
+ for (BlockletDataMap dataMap : blockletDataMaps) {
+ dataMap.clear();
+ }
+ throw new IOException("Problem in loading segment blocks.", e);
+ }
+ return blockletDataMaps;
+ }
+
+ /**
+ * returns the SegmentTaskIndexWrapper
+ *
+ * @param tableSegmentUniqueIdentifier
+ * @return
+ */
+ @Override public BlockletDataMap getIfPresent(
+ TableBlockIndexUniqueIdentifier tableSegmentUniqueIdentifier) {
+ BlockletDataMap dataMap = (BlockletDataMap) lruCache
+ .get(tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier());
+ return dataMap;
+ }
+
+ /**
+ * method invalidate the segment cache for segment
+ *
+ * @param tableSegmentUniqueIdentifier
+ */
+ @Override public void invalidate(TableBlockIndexUniqueIdentifier tableSegmentUniqueIdentifier) {
+ lruCache.remove(tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier());
+ }
+
+ /**
+ * Below method will be used to load the segment of segments
+ * One segment may have multiple task , so table segment will be loaded
+ * based on task id and will return the map of taksId to table segment
+ * map
+ *
+ * @return map of taks id to segment mapping
+ * @throws IOException
+ */
+ private BlockletDataMap loadAndGetDataMap(
+ TableBlockIndexUniqueIdentifier tableSegmentUniqueIdentifier) throws IOException {
+ String uniqueTableSegmentIdentifier =
+ tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier();
+ Object lock = segmentLockMap.get(uniqueTableSegmentIdentifier);
+ if (lock == null) {
+ lock = addAndGetSegmentLock(uniqueTableSegmentIdentifier);
+ }
+ BlockletDataMap dataMap = null;
+ synchronized (lock) {
+ dataMap = new BlockletDataMap();
+ dataMap.init(tableSegmentUniqueIdentifier.getFilePath());
+ lruCache.put(tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier(), dataMap,
+ dataMap.getMemorySize());
+ }
+ return dataMap;
+ }
+
+ /**
+ * Below method will be used to get the segment level lock object
+ *
+ * @param uniqueIdentifier
+ * @return lock object
+ */
+ private synchronized Object addAndGetSegmentLock(String uniqueIdentifier) {
+ // get the segment lock object if it is present then return
+ // otherwise add the new lock and return
+ Object segmentLoderLockObject = segmentLockMap.get(uniqueIdentifier);
+ if (null == segmentLoderLockObject) {
+ segmentLoderLockObject = new Object();
+ segmentLockMap.put(uniqueIdentifier, segmentLoderLockObject);
+ }
+ return segmentLoderLockObject;
+ }
+
+ /**
+ * The method clears the access count of table segments
+ *
+ * @param tableSegmentUniqueIdentifiers
+ */
+ @Override public void clearAccessCount(
+ List<TableBlockIndexUniqueIdentifier> tableSegmentUniqueIdentifiers) {
+ for (TableBlockIndexUniqueIdentifier segmentUniqueIdentifier : tableSegmentUniqueIdentifiers) {
+ BlockletDataMap cacheable =
+ (BlockletDataMap) lruCache.get(segmentUniqueIdentifier.getUniqueTableSegmentIdentifier());
+ cacheable.clear();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java
new file mode 100644
index 0000000..68dedd8
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Blocklet detail information to be sent to each executor
+ */
+public class BlockletDetailInfo implements Serializable, Writable {
+
+ private int rowCount;
+
+ private short pagesCount;
+
+ private short versionNumber;
+
+ private int[] dimLens;
+
+ private long schemaUpdatedTimeStamp;
+
+ private BlockletInfo blockletInfo;
+
+ public int getRowCount() {
+ return rowCount;
+ }
+
+ public void setRowCount(int rowCount) {
+ this.rowCount = rowCount;
+ }
+
+ public int getPagesCount() {
+ return pagesCount;
+ }
+
+ public void setPagesCount(short pagesCount) {
+ this.pagesCount = pagesCount;
+ }
+
+ public short getVersionNumber() {
+ return versionNumber;
+ }
+
+ public void setVersionNumber(short versionNumber) {
+ this.versionNumber = versionNumber;
+ }
+
+ public BlockletInfo getBlockletInfo() {
+ return blockletInfo;
+ }
+
+ public void setBlockletInfo(BlockletInfo blockletInfo) {
+ this.blockletInfo = blockletInfo;
+ }
+
+ public int[] getDimLens() {
+ return dimLens;
+ }
+
+ public void setDimLens(int[] dimLens) {
+ this.dimLens = dimLens;
+ }
+
+ public long getSchemaUpdatedTimeStamp() {
+ return schemaUpdatedTimeStamp;
+ }
+
+ public void setSchemaUpdatedTimeStamp(long schemaUpdatedTimeStamp) {
+ this.schemaUpdatedTimeStamp = schemaUpdatedTimeStamp;
+ }
+
+ @Override public void write(DataOutput out) throws IOException {
+ out.writeInt(rowCount);
+ out.writeShort(pagesCount);
+ out.writeShort(versionNumber);
+ out.writeShort(dimLens.length);
+ for (int i = 0; i < dimLens.length; i++) {
+ out.writeInt(dimLens[i]);
+ }
+ out.writeLong(schemaUpdatedTimeStamp);
+ blockletInfo.write(out);
+ }
+
+ @Override public void readFields(DataInput in) throws IOException {
+ rowCount = in.readInt();
+ pagesCount = in.readShort();
+ versionNumber = in.readShort();
+ dimLens = new int[in.readShort()];
+ for (int i = 0; i < dimLens.length; i++) {
+ dimLens[i] = in.readInt();
+ }
+ schemaUpdatedTimeStamp = in.readLong();
+ blockletInfo = new BlockletInfo();
+ blockletInfo.readFields(in);
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/indexstore/DataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/DataMap.java
index 2651f15..1276494 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/DataMap.java
@@ -21,7 +21,7 @@ import java.util.List;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
/**
- * Interface for adding and retrieving index data.
+ * Datamap is an entity which can store and retrieve index data.
*/
public interface DataMap {
@@ -47,6 +47,12 @@ public interface DataMap {
List<Blocklet> prune(FilterResolverIntf filterExp);
/**
+ * Convert datamap to distributable object
+ * @return
+ */
+ DataMapDistributable toDistributable();
+
+ /**
* Clear complete index table and release memory.
*/
void clear();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapFactory.java
new file mode 100644
index 0000000..72f714f
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapFactory.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore;
+
+import java.util.List;
+
+import org.apache.carbondata.core.events.ChangeEvent;
+import org.apache.carbondata.core.indexstore.schema.FilterType;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+
+/**
+ * Interface for datamap factory, it is responsible for creating the datamap.
+ */
+public interface DataMapFactory {
+
+ /**
+ * Initialization of Datamap factory
+ * @param identifier
+ * @param dataMapName
+ */
+ void init(AbsoluteTableIdentifier identifier, String dataMapName);
+ /**
+ * Get the datamap writer for each segmentid.
+ *
+ * @param identifier
+ * @param segmentId
+ * @return
+ */
+ DataMapWriter getDataMapWriter(AbsoluteTableIdentifier identifier,
+ String segmentId);
+
+ /**
+ * Get the datamap for segmentid
+ *
+ * @param segmentId
+ * @return
+ */
+ List<DataMap> getDataMaps(String segmentId);
+
+ /**
+ * Get datamap for distributable object.
+ *
+ * @param distributable
+ * @return
+ */
+ DataMap getDataMap(DataMapDistributable distributable);
+
+ /**
+ * This method checks whether the columns and the type of filters supported
+ * for this datamap or not
+ *
+ * @param filterType
+ * @return
+ */
+ boolean isFiltersSupported(FilterType filterType);
+
+ /**
+ *
+ * @param event
+ */
+ void fireEvent(ChangeEvent event);
+
+ /**
+ * Clears datamap of the segment
+ */
+ void clear(String segmentId);
+
+ /**
+ * Clear all datamaps from memory
+ */
+ void clear();
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapStoreManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapStoreManager.java b/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapStoreManager.java
index 06638ad..1a36187 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapStoreManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapStoreManager.java
@@ -16,7 +16,9 @@
*/
package org.apache.carbondata.core.indexstore;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import org.apache.carbondata.common.logging.LogService;
@@ -24,13 +26,16 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
/**
- * It maintains all the index tables in it.
+ * It maintains all the DataMaps in it.
*/
-public class DataMapStoreManager {
+public final class DataMapStoreManager {
private static DataMapStoreManager instance = new DataMapStoreManager();
- private Map<DataMapType, Map<String, TableDataMap>> dataMapMappping = new HashMap<>();
+ /**
+ * Contains the list of datamaps for each table.
+ */
+ private Map<AbsoluteTableIdentifier, List<TableDataMap>> dataMapMappping = new HashMap<>();
private static final LogService LOGGER =
LogServiceFactory.getLogService(DataMapStoreManager.class.getName());
@@ -48,56 +53,85 @@ public class DataMapStoreManager {
*/
public TableDataMap getDataMap(AbsoluteTableIdentifier identifier, String dataMapName,
DataMapType mapType) {
- Map<String, TableDataMap> map = dataMapMappping.get(mapType);
- TableDataMap dataMap = null;
- if (map == null) {
+ List<TableDataMap> tableDataMaps = dataMapMappping.get(identifier);
+ TableDataMap dataMap;
+ if (tableDataMaps == null) {
+ createTableDataMap(identifier, mapType, dataMapName);
+ tableDataMaps = dataMapMappping.get(identifier);
+ }
+ dataMap = getAbstractTableDataMap(dataMapName, tableDataMaps);
+ if (dataMap == null) {
throw new RuntimeException("Datamap does not exist");
- } else {
- dataMap = map.get(dataMapName);
- if (dataMap == null) {
- throw new RuntimeException("Datamap does not exist");
- }
}
- // Initialize datamap
- dataMap.init(identifier, dataMapName);
return dataMap;
}
/**
- * Create new datamap instance using datamap type and path
+ * Create new datamap instance using datamap name, datamap type and table identifier
*
* @param mapType
* @return
*/
- public TableDataMap createTableDataMap(AbsoluteTableIdentifier identifier, DataMapType mapType,
- String dataMapName) {
- Map<String, TableDataMap> map = dataMapMappping.get(mapType);
- if (map == null) {
- map = new HashMap<>();
- dataMapMappping.put(mapType, map);
+ private TableDataMap createTableDataMap(AbsoluteTableIdentifier identifier,
+ DataMapType mapType, String dataMapName) {
+ List<TableDataMap> tableDataMaps = dataMapMappping.get(identifier);
+ if (tableDataMaps == null) {
+ tableDataMaps = new ArrayList<>();
+ dataMapMappping.put(identifier, tableDataMaps);
}
- TableDataMap dataMap = map.get(dataMapName);
+ TableDataMap dataMap = getAbstractTableDataMap(dataMapName, tableDataMaps);
if (dataMap != null) {
throw new RuntimeException("Already datamap exists in that path with type " + mapType);
}
try {
- //TODO create datamap using @mapType.getClassName())
+ DataMapFactory dataMapFactory = mapType.getClassObject().newInstance();
+ dataMapFactory.init(identifier, dataMapName);
+ dataMap = new TableDataMap(identifier, dataMapName, dataMapFactory);
} catch (Exception e) {
LOGGER.error(e);
+ throw new RuntimeException(e);
+ }
+ tableDataMaps.add(dataMap);
+ return dataMap;
+ }
+
+ private TableDataMap getAbstractTableDataMap(String dataMapName,
+ List<TableDataMap> tableDataMaps) {
+ TableDataMap dataMap = null;
+ for (TableDataMap tableDataMap: tableDataMaps) {
+ if (tableDataMap.getDataMapName().equals(dataMapName)) {
+ dataMap = tableDataMap;
+ break;
+ }
}
- dataMap.init(identifier, dataMapName);
- map.put(dataMapName, dataMap);
return dataMap;
}
- public void clearDataMap(String dataMapName, DataMapType mapType) {
- Map<String, TableDataMap> map = dataMapMappping.get(mapType);
- if (map != null && map.get(dataMapName) != null) {
- map.remove(dataMapName).clear();
+ /**
+ * Clear the datamap/datamaps of a mentioned datamap name and table from memory
+ * @param identifier
+ * @param dataMapName
+ */
+ public void clearDataMap(AbsoluteTableIdentifier identifier, String dataMapName) {
+ List<TableDataMap> tableDataMaps = dataMapMappping.get(identifier);
+ if (tableDataMaps != null) {
+ int i = 0;
+ for (TableDataMap tableDataMap: tableDataMaps) {
+ if (tableDataMap != null && dataMapName.equals(tableDataMap.getDataMapName())) {
+ tableDataMap.clear(new ArrayList<String>());
+ tableDataMaps.remove(i);
+ break;
+ }
+ i++;
+ }
}
}
+ /**
+ * Returns the singleton instance
+ * @return
+ */
public static DataMapStoreManager getInstance() {
return instance;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapType.java b/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapType.java
index b6a0f5b..0059b29 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapType.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapType.java
@@ -16,19 +16,21 @@
*/
package org.apache.carbondata.core.indexstore;
+import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory;
+
/**
* Datamap type
*/
public enum DataMapType {
- BLOCKLET("org.apache.carbondata.datamap.BlockletDataMap");
+ BLOCKLET(BlockletDataMapFactory.class);
- private String className;
+ private Class<? extends DataMapFactory> classObject;
- DataMapType(String className) {
- this.className = className;
+ DataMapType(Class<? extends DataMapFactory> classObject) {
+ this.classObject = classObject;
}
- public String getClassName() {
- return className;
+ public Class<? extends DataMapFactory> getClassObject() {
+ return classObject;
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java b/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java
new file mode 100644
index 0000000..7e2bc0e
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.indexstore;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+
+/**
+ * Class holds the absoluteTableIdentifier and segmentId to uniquely identify a segment
+ */
+public class TableBlockIndexUniqueIdentifier {
+ /**
+ * table fully qualified identifier
+ */
+ private AbsoluteTableIdentifier absoluteTableIdentifier;
+
+ private String segmentId;
+
+ private String carbonIndexFileName;
+
+ /**
+ * Constructor to initialize the class instance
+ *
+ * @param absoluteTableIdentifier
+ * @param segmentId
+ */
+ public TableBlockIndexUniqueIdentifier(AbsoluteTableIdentifier absoluteTableIdentifier,
+ String segmentId, String carbonIndexFileName) {
+ this.absoluteTableIdentifier = absoluteTableIdentifier;
+ this.segmentId = segmentId;
+ this.carbonIndexFileName = carbonIndexFileName;
+ }
+
+ /**
+ * returns AbsoluteTableIdentifier
+ *
+ * @return
+ */
+ public AbsoluteTableIdentifier getAbsoluteTableIdentifier() {
+ return absoluteTableIdentifier;
+ }
+
+ public String getSegmentId() {
+ return segmentId;
+ }
+
+ /**
+ * method returns the id to uniquely identify a key
+ *
+ * @return
+ */
+ public String getUniqueTableSegmentIdentifier() {
+ CarbonTableIdentifier carbonTableIdentifier =
+ absoluteTableIdentifier.getCarbonTableIdentifier();
+ return carbonTableIdentifier.getDatabaseName() + CarbonCommonConstants.FILE_SEPARATOR
+ + carbonTableIdentifier.getTableName() + CarbonCommonConstants.UNDERSCORE
+ + carbonTableIdentifier.getTableId() + CarbonCommonConstants.FILE_SEPARATOR + segmentId
+ + CarbonCommonConstants.FILE_SEPARATOR + carbonIndexFileName;
+ }
+
+ public String getFilePath() {
+ return absoluteTableIdentifier.getTablePath() + "/Fact/Part0/Segment_" + segmentId + "/"
+ + carbonIndexFileName;
+ }
+
+ @Override public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ TableBlockIndexUniqueIdentifier that = (TableBlockIndexUniqueIdentifier) o;
+
+ if (!absoluteTableIdentifier.equals(that.absoluteTableIdentifier)) {
+ return false;
+ }
+ if (!segmentId.equals(that.segmentId)) {
+ return false;
+ }
+ return carbonIndexFileName.equals(that.carbonIndexFileName);
+ }
+
+ @Override public int hashCode() {
+ int result = absoluteTableIdentifier.hashCode();
+ result = 31 * result + segmentId.hashCode();
+ result = 31 * result + carbonIndexFileName.hashCode();
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/indexstore/TableDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/TableDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/TableDataMap.java
index e1532c8..39ca4c5 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/TableDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/TableDataMap.java
@@ -16,38 +16,34 @@
*/
package org.apache.carbondata.core.indexstore;
+import java.util.ArrayList;
import java.util.List;
+import org.apache.carbondata.core.events.ChangeEvent;
import org.apache.carbondata.core.events.EventListener;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
-
/**
* DataMap at the table level, user can add any number of datamaps for one table. Depends
* on the filter condition it can prune the blocklets.
*/
-public interface TableDataMap extends EventListener {
+public final class TableDataMap implements EventListener {
- /**
- * It is called to initialize and load the required table datamap metadata.
- */
- void init(AbsoluteTableIdentifier identifier, String dataMapName);
+ private AbsoluteTableIdentifier identifier;
- /**
- * Gives the writer to write the metadata information of this datamap at table level.
- *
- * @return
- */
- DataMapWriter getWriter();
+ private String dataMapName;
+
+ private DataMapFactory dataMapFactory;
/**
- * Create the datamap using the segmentid and name.
- *
- * @param identifier
- * @param segmentId
- * @return
+ * It is called to initialize and load the required table datamap metadata.
*/
- DataMap createDataMap(AbsoluteTableIdentifier identifier, String segmentId);
+ public TableDataMap(AbsoluteTableIdentifier identifier, String dataMapName,
+ DataMapFactory dataMapFactory) {
+ this.identifier = identifier;
+ this.dataMapName = dataMapName;
+ this.dataMapFactory = dataMapFactory;
+ }
/**
* Pass the valid segments and prune the datamap using filter expression
@@ -56,7 +52,24 @@ public interface TableDataMap extends EventListener {
* @param filterExp
* @return
*/
- List<Blocklet> prune(List<String> segmentIds, FilterResolverIntf filterExp);
+ public List<Blocklet> prune(List<String> segmentIds, FilterResolverIntf filterExp) {
+ List<Blocklet> blocklets = new ArrayList<>();
+ for (String segmentId : segmentIds) {
+ List<DataMap> dataMaps = dataMapFactory.getDataMaps(segmentId);
+ for (DataMap dataMap : dataMaps) {
+ List<Blocklet> pruneBlocklets = dataMap.prune(filterExp);
+ blocklets.addAll(addSegmentId(pruneBlocklets, segmentId));
+ }
+ }
+ return blocklets;
+ }
+
+ private List<Blocklet> addSegmentId(List<Blocklet> pruneBlocklets, String segmentId) {
+ for (Blocklet blocklet : pruneBlocklets) {
+ blocklet.setSegmentId(segmentId);
+ }
+ return pruneBlocklets;
+ }
/**
* This is used for making the datamap distributable.
@@ -65,7 +78,16 @@ public interface TableDataMap extends EventListener {
*
* @return
*/
- List<DataMapDistributable> toDistributable(List<String> segmentIds);
+ public List<DataMapDistributable> toDistributable(List<String> segmentIds) {
+ List<DataMapDistributable> distributables = new ArrayList<>();
+ for (String segmentsId : segmentIds) {
+ List<DataMap> dataMaps = dataMapFactory.getDataMaps(segmentsId);
+ for (DataMap dataMap : dataMaps) {
+ distributables.add(dataMap.toDistributable());
+ }
+ }
+ return distributables;
+ }
/**
* This method is used from any machine after it is distributed. It takes the distributable object
@@ -75,20 +97,37 @@ public interface TableDataMap extends EventListener {
* @param filterExp
* @return
*/
- List<Blocklet> prune(DataMapDistributable distributable, FilterResolverIntf filterExp);
+ public List<Blocklet> prune(DataMapDistributable distributable, FilterResolverIntf filterExp) {
+ return dataMapFactory.getDataMap(distributable).prune(filterExp);
+ }
+
+ @Override public void fireEvent(ChangeEvent event) {
+ dataMapFactory.fireEvent(event);
+ }
/**
- * This method checks whether the columns and the type of filters supported
- * for this datamap or not
- *
- * @param filterExp
- * @return
+ * Clear only the datamaps of the segments
+ * @param segmentIds
*/
- boolean isFiltersSupported(FilterResolverIntf filterExp);
+ public void clear(List<String> segmentIds) {
+ for (String segmentId: segmentIds) {
+ dataMapFactory.clear(segmentId);
+ }
+ }
/**
- * Clears table level datamap
+ * Clears all datamap
+ */
+ public void clear() {
+ dataMapFactory.clear();
+ }
+ /**
+ * Get the unique name of datamap
+ *
+ * @return
*/
- void clear();
+ public String getDataMapName() {
+ return dataMapName;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
new file mode 100644
index 0000000..8246f99
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore;
+
+import org.apache.carbondata.core.indexstore.row.DataMapRow;
+import org.apache.carbondata.core.indexstore.row.UnsafeDataMapRow;
+import org.apache.carbondata.core.indexstore.schema.DataMapSchema;
+import org.apache.carbondata.core.memory.MemoryAllocator;
+import org.apache.carbondata.core.memory.MemoryAllocatorFactory;
+import org.apache.carbondata.core.memory.MemoryBlock;
+
+import static org.apache.carbondata.core.memory.CarbonUnsafe.BYTE_ARRAY_OFFSET;
+import static org.apache.carbondata.core.memory.CarbonUnsafe.unsafe;
+
+/**
+ * Store the data map row @{@link DataMapRow} data to unsafe.
+ */
+public class UnsafeMemoryDMStore {
+
+ private MemoryBlock memoryBlock;
+
+ private static int capacity = 8 * 1024 * 1024;
+
+ private int allocatedSize;
+
+ private int runningLength;
+
+ private MemoryAllocator memoryAllocator;
+
+ private boolean isMemoryFreed;
+
+ private DataMapSchema[] schema;
+
+ private int[] pointers;
+
+ private int rowCount;
+
+ public UnsafeMemoryDMStore(DataMapSchema[] schema) {
+ this.schema = schema;
+ this.memoryAllocator = MemoryAllocatorFactory.INSATANCE.getMemoryAllocator();
+ this.allocatedSize = capacity;
+ this.memoryBlock = memoryAllocator.allocate(allocatedSize);
+ this.pointers = new int[1000];
+ }
+
+ /**
+ * Check memory is sufficient or not, if not sufficient allocate more memory and copy old data to
+ * new one.
+ *
+ * @param rowSize
+ */
+ private void ensureSize(int rowSize) {
+ if (runningLength + rowSize >= allocatedSize) {
+ MemoryBlock allocate =
+ MemoryAllocatorFactory.INSATANCE.getMemoryAllocator().allocate(allocatedSize + capacity);
+ unsafe.copyMemory(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset(),
+ allocate.getBaseObject(), allocate.getBaseOffset(), runningLength);
+ memoryAllocator.free(memoryBlock);
+ allocatedSize = allocatedSize + capacity;
+ memoryBlock = allocate;
+ }
+ if (this.pointers.length <= rowCount + 1) {
+ int[] newPointer = new int[pointers.length + 1000];
+ System.arraycopy(pointers, 0, newPointer, 0, pointers.length);
+ this.pointers = newPointer;
+ }
+ }
+
+ /**
+ * Add the index row to unsafe.
+ *
+ * @param indexRow
+ * @return
+ */
+ public void addIndexRowToUnsafe(DataMapRow indexRow) {
+ // First calculate the required memory to keep the row in unsafe
+ int rowSize = indexRow.getTotalSizeInBytes();
+ // Check whether allocated memory is sufficient or not.
+ ensureSize(rowSize);
+ int pointer = runningLength;
+
+ for (int i = 0; i < schema.length; i++) {
+ addToUnsafe(schema[i], indexRow, i);
+ }
+ pointers[rowCount++] = pointer;
+ }
+
+ private void addToUnsafe(DataMapSchema schema, DataMapRow row, int index) {
+ switch (schema.getSchemaType()) {
+ case FIXED:
+ switch (schema.getDataType()) {
+ case BYTE:
+ unsafe.putByte(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset() + runningLength,
+ row.getByte(index));
+ runningLength += row.getSizeInBytes(index);
+ break;
+ case SHORT:
+ unsafe
+ .putShort(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset() + runningLength,
+ row.getShort(index));
+ runningLength += row.getSizeInBytes(index);
+ break;
+ case INT:
+ unsafe.putInt(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset() + runningLength,
+ row.getInt(index));
+ runningLength += row.getSizeInBytes(index);
+ break;
+ case LONG:
+ unsafe.putLong(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset() + runningLength,
+ row.getLong(index));
+ runningLength += row.getSizeInBytes(index);
+ break;
+ case FLOAT:
+ unsafe
+ .putFloat(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset() + runningLength,
+ row.getFloat(index));
+ runningLength += row.getSizeInBytes(index);
+ break;
+ case DOUBLE:
+ unsafe
+ .putDouble(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset() + runningLength,
+ row.getDouble(index));
+ runningLength += row.getSizeInBytes(index);
+ break;
+ case BYTE_ARRAY:
+ byte[] data = row.getByteArray(index);
+ unsafe.copyMemory(data, BYTE_ARRAY_OFFSET, memoryBlock.getBaseObject(),
+ memoryBlock.getBaseOffset() + runningLength, data.length);
+ runningLength += row.getSizeInBytes(index);
+ break;
+ }
+ break;
+ case VARIABLE:
+ byte[] data = row.getByteArray(index);
+ unsafe.putShort(memoryBlock.getBaseOffset() + runningLength, (short) data.length);
+ runningLength += 2;
+ unsafe.copyMemory(data, BYTE_ARRAY_OFFSET, memoryBlock.getBaseObject(),
+ memoryBlock.getBaseOffset() + runningLength, data.length);
+ runningLength += data.length;
+ break;
+ case STRUCT:
+ DataMapSchema[] childSchemas =
+ ((DataMapSchema.StructDataMapSchema) schema).getChildSchemas();
+ DataMapRow struct = row.getRow(index);
+ for (int i = 0; i < childSchemas.length; i++) {
+ addToUnsafe(childSchemas[i], struct, i);
+ }
+ break;
+ }
+ }
+
+ public DataMapRow getUnsafeRow(int index) {
+ assert (index < rowCount);
+ return new UnsafeDataMapRow(schema, memoryBlock, pointers[index]);
+ }
+
+ public void finishWriting() {
+ if (runningLength < allocatedSize) {
+ MemoryBlock allocate =
+ MemoryAllocatorFactory.INSATANCE.getMemoryAllocator().allocate(runningLength);
+ unsafe.copyMemory(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset(),
+ allocate.getBaseObject(), allocate.getBaseOffset(), runningLength);
+ memoryAllocator.free(memoryBlock);
+ memoryBlock = allocate;
+ }
+ // Compact pointers.
+ if (rowCount < pointers.length) {
+ int[] newPointer = new int[rowCount];
+ System.arraycopy(pointers, 0, newPointer, 0, rowCount);
+ this.pointers = newPointer;
+ }
+ }
+
+ public void freeMemory() {
+ if (!isMemoryFreed) {
+ memoryAllocator.free(memoryBlock);
+ isMemoryFreed = true;
+ }
+ }
+
+ public int getMemoryUsed() {
+ return runningLength;
+ }
+
+ public DataMapSchema[] getSchema() {
+ return schema;
+ }
+
+ public int getRowCount() {
+ return rowCount;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDMComparator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDMComparator.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDMComparator.java
new file mode 100644
index 0000000..9a50600
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDMComparator.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore.blockletindex;
+
+import java.nio.ByteBuffer;
+import java.util.Comparator;
+
+import org.apache.carbondata.core.indexstore.row.DataMapRow;
+import org.apache.carbondata.core.util.ByteUtil;
+
+/**
+ * Data map comparator
+ */
+public class BlockletDMComparator implements Comparator<DataMapRow> {
+
+ /**
+ * no dictionary column value is of variable length so in each column value
+ * it will -1
+ */
+ private static final int NO_DCITIONARY_COLUMN_VALUE = -1;
+
+ /**
+ * sized of the short value in bytes
+ */
+ private static final short SHORT_SIZE_IN_BYTES = 2;
+
+ private int[] eachColumnValueSize;
+
+ /**
+ * the number of no dictionary columns in SORT_COLUMNS
+ */
+ private int numberOfNoDictSortColumns;
+
+ /**
+ * the number of columns in SORT_COLUMNS
+ */
+ private int numberOfSortColumns;
+
+ public BlockletDMComparator(int[] eachColumnValueSize, int numberOfSortColumns,
+ int numberOfNoDictSortColumns) {
+ this.eachColumnValueSize = eachColumnValueSize;
+ this.numberOfNoDictSortColumns = numberOfNoDictSortColumns;
+ this.numberOfSortColumns = numberOfSortColumns;
+ }
+
+ @Override public int compare(DataMapRow first, DataMapRow second) {
+ int dictionaryKeyOffset = 0;
+ int nonDictionaryKeyOffset = 0;
+ int compareResult = 0;
+ int processedNoDictionaryColumn = numberOfNoDictSortColumns;
+ byte[][] firstBytes = splitKey(first.getByteArray(0));
+ byte[][] secondBytes = splitKey(first.getByteArray(0));
+ byte[] firstNoDictionaryKeys = firstBytes[1];
+ ByteBuffer firstNoDictionaryKeyBuffer = ByteBuffer.wrap(firstNoDictionaryKeys);
+ byte[] secondNoDictionaryKeys = secondBytes[1];
+ ByteBuffer secondNoDictionaryKeyBuffer = ByteBuffer.wrap(secondNoDictionaryKeys);
+ int actualOffset = 0;
+ int actualOffset1 = 0;
+ int firstNoDcitionaryLength = 0;
+ int secondNodeDictionaryLength = 0;
+
+ for (int i = 0; i < numberOfSortColumns; i++) {
+
+ if (eachColumnValueSize[i] != NO_DCITIONARY_COLUMN_VALUE) {
+ byte[] firstDictionaryKeys = firstBytes[0];
+ byte[] secondDictionaryKeys = secondBytes[0];
+ compareResult = ByteUtil.UnsafeComparer.INSTANCE
+ .compareTo(firstDictionaryKeys, dictionaryKeyOffset, eachColumnValueSize[i],
+ secondDictionaryKeys, dictionaryKeyOffset, eachColumnValueSize[i]);
+ dictionaryKeyOffset += eachColumnValueSize[i];
+ } else {
+ if (processedNoDictionaryColumn > 1) {
+ actualOffset = firstNoDictionaryKeyBuffer.getShort(nonDictionaryKeyOffset);
+ firstNoDcitionaryLength =
+ firstNoDictionaryKeyBuffer.getShort(nonDictionaryKeyOffset + SHORT_SIZE_IN_BYTES)
+ - actualOffset;
+ actualOffset1 = secondNoDictionaryKeyBuffer.getShort(nonDictionaryKeyOffset);
+ secondNodeDictionaryLength =
+ secondNoDictionaryKeyBuffer.getShort(nonDictionaryKeyOffset + SHORT_SIZE_IN_BYTES)
+ - actualOffset1;
+ compareResult = ByteUtil.UnsafeComparer.INSTANCE
+ .compareTo(firstNoDictionaryKeys, actualOffset, firstNoDcitionaryLength,
+ secondNoDictionaryKeys, actualOffset1, secondNodeDictionaryLength);
+ nonDictionaryKeyOffset += SHORT_SIZE_IN_BYTES;
+ processedNoDictionaryColumn--;
+ } else {
+ actualOffset = firstNoDictionaryKeyBuffer.getShort(nonDictionaryKeyOffset);
+ actualOffset1 = secondNoDictionaryKeyBuffer.getShort(nonDictionaryKeyOffset);
+ firstNoDcitionaryLength = firstNoDictionaryKeys.length - actualOffset;
+ secondNodeDictionaryLength = secondNoDictionaryKeys.length - actualOffset1;
+ compareResult = ByteUtil.UnsafeComparer.INSTANCE
+ .compareTo(firstNoDictionaryKeys, actualOffset, firstNoDcitionaryLength,
+ secondNoDictionaryKeys, actualOffset1, secondNodeDictionaryLength);
+ }
+ }
+ if (compareResult != 0) {
+ return compareResult;
+ }
+ }
+
+ return 0;
+ }
+
+ /**
+ * Split the index key to dictionary and no dictionary.
+ * @param startKey
+ * @return
+ */
+ private byte[][] splitKey(byte[] startKey) {
+ ByteBuffer buffer = ByteBuffer.wrap(startKey);
+ buffer.rewind();
+ int dictonaryKeySize = buffer.getInt();
+ int nonDictonaryKeySize = buffer.getInt();
+ byte[] dictionaryKey = new byte[dictonaryKeySize];
+ buffer.get(dictionaryKey);
+ byte[] nonDictionaryKey = new byte[nonDictonaryKeySize];
+ buffer.get(nonDictionaryKey);
+ return new byte[][] {dictionaryKey, nonDictionaryKey};
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
new file mode 100644
index 0000000..79aa091
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore.blockletindex;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.cache.Cacheable;
+import org.apache.carbondata.core.datastore.IndexKey;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.indexstore.Blocklet;
+import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
+import org.apache.carbondata.core.indexstore.DataMap;
+import org.apache.carbondata.core.indexstore.DataMapDistributable;
+import org.apache.carbondata.core.indexstore.DataMapWriter;
+import org.apache.carbondata.core.indexstore.UnsafeMemoryDMStore;
+import org.apache.carbondata.core.indexstore.row.DataMapRow;
+import org.apache.carbondata.core.indexstore.row.DataMapRowImpl;
+import org.apache.carbondata.core.indexstore.schema.DataMapSchema;
+import org.apache.carbondata.core.keygenerator.KeyGenException;
+import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
+import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.scan.filter.FilterUtil;
+import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataFileFooterConverter;
+
+/**
+ * Datamap implementation for blocklet.
+ */
+public class BlockletDataMap implements DataMap, Cacheable {
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(BlockletDataMap.class.getName());
+
+ private static int KEY_INDEX = 0;
+
+ private static int MIN_VALUES_INDEX = 1;
+
+ private static int MAX_VALUES_INDEX = 2;
+
+ private static int ROW_COUNT_INDEX = 3;
+
+ private static int FILE_PATH_INDEX = 4;
+
+ private static int PAGE_COUNT_INDEX = 5;
+
+ private static int VERSION_INDEX = 6;
+
+ private static int SCHEMA_UPADATED_TIME_INDEX = 7;
+
+ private static int BLOCK_INFO_INDEX = 8;
+
+ private UnsafeMemoryDMStore unsafeMemoryDMStore;
+
+ private SegmentProperties segmentProperties;
+
+ private int[] columnCardinality;
+
+ @Override public DataMapWriter getWriter() {
+ return null;
+ }
+
+ @Override public void init(String path) {
+ DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
+ try {
+ List<DataFileFooter> indexInfo = fileFooterConverter.getIndexInfo(path);
+ for (DataFileFooter fileFooter : indexInfo) {
+ List<ColumnSchema> columnInTable = fileFooter.getColumnInTable();
+ if (segmentProperties == null) {
+ columnCardinality = fileFooter.getSegmentInfo().getColumnCardinality();
+ segmentProperties = new SegmentProperties(columnInTable, columnCardinality);
+ createSchema(segmentProperties);
+ }
+ TableBlockInfo blockInfo = fileFooter.getBlockInfo().getTableBlockInfo();
+ fileFooter = CarbonUtil.readMetadatFile(blockInfo);
+
+ loadToUnsafe(fileFooter, segmentProperties, blockInfo.getFilePath());
+ }
+ if (unsafeMemoryDMStore != null) {
+ unsafeMemoryDMStore.finishWriting();
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void loadToUnsafe(DataFileFooter fileFooter, SegmentProperties segmentProperties,
+ String filePath) {
+ int[] minMaxLen = segmentProperties.getEachDimColumnValueSize();
+ List<BlockletInfo> blockletList = fileFooter.getBlockletList();
+ DataMapSchema[] schema = unsafeMemoryDMStore.getSchema();
+ for (int index = 0; index < blockletList.size(); index++) {
+ DataMapRow row = new DataMapRowImpl(schema);
+ int ordinal = 0;
+ BlockletInfo blockletInfo = blockletList.get(index);
+
+ // add start key as index key
+ row.setByteArray(blockletInfo.getBlockletIndex().getBtreeIndex().getStartKey(), ordinal++);
+
+ BlockletMinMaxIndex minMaxIndex = blockletInfo.getBlockletIndex().getMinMaxIndex();
+ row.setRow(addMinMax(minMaxLen, schema[ordinal], minMaxIndex.getMinValues()), ordinal);
+ ordinal++;
+ row.setRow(addMinMax(minMaxLen, schema[ordinal], minMaxIndex.getMaxValues()), ordinal);
+ ordinal++;
+
+ row.setInt(blockletInfo.getNumberOfRows(), ordinal++);
+
+ // add file path
+ byte[] filePathBytes = filePath.getBytes();
+ row.setByteArray(filePathBytes, ordinal++);
+
+ // add pages
+ row.setShort((short) blockletInfo.getNumberOfPages(), ordinal++);
+
+ // add version number
+ row.setShort(fileFooter.getVersionId().number(), ordinal++);
+
+ // add schema updated time
+ row.setLong(fileFooter.getSchemaUpdatedTimeStamp(), ordinal++);
+
+ // add blocklet info
+ byte[] serializedData;
+ try {
+ ByteArrayOutputStream stream = new ByteArrayOutputStream();
+ DataOutput dataOutput = new DataOutputStream(stream);
+ blockletInfo.write(dataOutput);
+ serializedData = stream.toByteArray();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ row.setByteArray(serializedData, ordinal);
+ unsafeMemoryDMStore.addIndexRowToUnsafe(row);
+ }
+ }
+
+ private DataMapRow addMinMax(int[] minMaxLen, DataMapSchema dataMapSchema, byte[][] minValues) {
+ DataMapSchema[] minSchemas =
+ ((DataMapSchema.StructDataMapSchema) dataMapSchema).getChildSchemas();
+ DataMapRow minRow = new DataMapRowImpl(minSchemas);
+ int minOrdinal = 0;
+ // min value adding
+ for (int i = 0; i < minMaxLen.length; i++) {
+ minRow.setByteArray(minValues[i], minOrdinal++);
+ }
+ return minRow;
+ }
+
+ private void createSchema(SegmentProperties segmentProperties) {
+ List<DataMapSchema> indexSchemas = new ArrayList<>();
+
+ // Index key
+ indexSchemas.add(new DataMapSchema.VariableDataMapSchema(DataType.BYTE_ARRAY));
+ int[] minMaxLen = segmentProperties.getEachDimColumnValueSize();
+ // do it 2 times, one for min and one for max.
+ for (int k = 0; k < 2; k++) {
+ DataMapSchema[] mapSchemas = new DataMapSchema[minMaxLen.length];
+ for (int i = 0; i < minMaxLen.length; i++) {
+ if (minMaxLen[i] <= 0) {
+ mapSchemas[i] = new DataMapSchema.VariableDataMapSchema(DataType.BYTE_ARRAY);
+ } else {
+ mapSchemas[i] = new DataMapSchema.FixedDataMapSchema(DataType.BYTE_ARRAY, minMaxLen[i]);
+ }
+ }
+ DataMapSchema mapSchema = new DataMapSchema.StructDataMapSchema(DataType.STRUCT, mapSchemas);
+ indexSchemas.add(mapSchema);
+ }
+
+ // for number of rows.
+ indexSchemas.add(new DataMapSchema.FixedDataMapSchema(DataType.INT));
+
+ // for table block path
+ indexSchemas.add(new DataMapSchema.VariableDataMapSchema(DataType.BYTE_ARRAY));
+
+ // for number of pages.
+ indexSchemas.add(new DataMapSchema.FixedDataMapSchema(DataType.SHORT));
+
+ // for version number.
+ indexSchemas.add(new DataMapSchema.FixedDataMapSchema(DataType.SHORT));
+
+ // for schema updated time.
+ indexSchemas.add(new DataMapSchema.FixedDataMapSchema(DataType.LONG));
+
+ //for blocklet info
+ indexSchemas.add(new DataMapSchema.VariableDataMapSchema(DataType.BYTE_ARRAY));
+
+ unsafeMemoryDMStore =
+ new UnsafeMemoryDMStore(indexSchemas.toArray(new DataMapSchema[indexSchemas.size()]));
+ }
+
+ @Override public List<Blocklet> prune(FilterResolverIntf filterExp) {
+
+ // getting the start and end index key based on filter for hitting the
+ // selected block reference nodes based on filter resolver tree.
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("preparing the start and end key for finding"
+ + "start and end block as per filter resolver");
+ }
+ List<Blocklet> blocklets = new ArrayList<>();
+ Comparator<DataMapRow> comparator =
+ new BlockletDMComparator(segmentProperties.getEachDimColumnValueSize(),
+ segmentProperties.getNumberOfSortColumns(),
+ segmentProperties.getNumberOfNoDictSortColumns());
+ List<IndexKey> listOfStartEndKeys = new ArrayList<IndexKey>(2);
+ FilterUtil
+ .traverseResolverTreeAndGetStartAndEndKey(segmentProperties, filterExp, listOfStartEndKeys);
+ // reading the first value from list which has start key
+ IndexKey searchStartKey = listOfStartEndKeys.get(0);
+ // reading the last value from list which has end key
+ IndexKey searchEndKey = listOfStartEndKeys.get(1);
+ if (null == searchStartKey && null == searchEndKey) {
+ try {
+ // TODO need to handle for no dictionary dimensions
+ searchStartKey = FilterUtil.prepareDefaultStartIndexKey(segmentProperties);
+ // TODO need to handle for no dictionary dimensions
+ searchEndKey = FilterUtil.prepareDefaultEndIndexKey(segmentProperties);
+ } catch (KeyGenException e) {
+ return null;
+ }
+ }
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug(
+ "Successfully retrieved the start and end key" + "Dictionary Start Key: " + searchStartKey
+ .getDictionaryKeys() + "No Dictionary Start Key " + searchStartKey
+ .getNoDictionaryKeys() + "Dictionary End Key: " + searchEndKey.getDictionaryKeys()
+ + "No Dictionary End Key " + searchEndKey.getNoDictionaryKeys());
+ }
+ if (filterExp == null) {
+ int rowCount = unsafeMemoryDMStore.getRowCount();
+ for (int i = 0; i < rowCount; i++) {
+ DataMapRow unsafeRow = unsafeMemoryDMStore.getUnsafeRow(i);
+ blocklets.add(createBlocklet(unsafeRow, i));
+ }
+ } else {
+ int startIndex = findStartIndex(convertToRow(searchStartKey), comparator);
+ int endIndex = findEndIndex(convertToRow(searchEndKey), comparator);
+ FilterExecuter filterExecuter =
+ FilterUtil.getFilterExecuterTree(filterExp, segmentProperties, null);
+ while (startIndex <= endIndex) {
+ DataMapRow unsafeRow = unsafeMemoryDMStore.getUnsafeRow(startIndex);
+ BitSet bitSet = filterExecuter.isScanRequired(getMinMaxValue(unsafeRow, MAX_VALUES_INDEX),
+ getMinMaxValue(unsafeRow, MIN_VALUES_INDEX));
+ if (!bitSet.isEmpty()) {
+ blocklets.add(createBlocklet(unsafeRow, startIndex));
+ }
+ startIndex++;
+ }
+ }
+
+ return blocklets;
+ }
+
+ private byte[][] getMinMaxValue(DataMapRow row, int index) {
+ DataMapRow minMaxRow = row.getRow(index);
+ byte[][] minMax = new byte[minMaxRow.getColumnCount()][];
+ for (int i = 0; i < minMax.length; i++) {
+ minMax[i] = minMaxRow.getByteArray(i);
+ }
+ return minMax;
+ }
+
+ private Blocklet createBlocklet(DataMapRow row, int blockletId) {
+ Blocklet blocklet =
+ new Blocklet(new String(row.getByteArray(FILE_PATH_INDEX)), blockletId + "");
+ BlockletDetailInfo detailInfo = new BlockletDetailInfo();
+ detailInfo.setRowCount(row.getInt(ROW_COUNT_INDEX));
+ detailInfo.setPagesCount(row.getShort(PAGE_COUNT_INDEX));
+ detailInfo.setVersionNumber(row.getShort(VERSION_INDEX));
+ detailInfo.setDimLens(columnCardinality);
+ detailInfo.setSchemaUpdatedTimeStamp(row.getLong(SCHEMA_UPADATED_TIME_INDEX));
+ BlockletInfo blockletInfo = new BlockletInfo();
+ try {
+ byte[] byteArray = row.getByteArray(BLOCK_INFO_INDEX);
+ ByteArrayInputStream stream = new ByteArrayInputStream(byteArray);
+ DataInputStream inputStream = new DataInputStream(stream);
+ blockletInfo.readFields(inputStream);
+ inputStream.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ detailInfo.setBlockletInfo(blockletInfo);
+ blocklet.setDetailInfo(detailInfo);
+ return blocklet;
+ }
+
+ /**
+ * Binary search used to get the first tentative index row based on
+ * search key
+ *
+ * @param key search key
+ * @return first tentative block
+ */
+ private int findStartIndex(DataMapRow key, Comparator<DataMapRow> comparator) {
+ int childNodeIndex;
+ int low = 0;
+ int high = unsafeMemoryDMStore.getRowCount() - 1;
+ int mid = 0;
+ int compareRes = -1;
+ //
+ while (low <= high) {
+ mid = (low + high) >>> 1;
+ // compare the entries
+ compareRes = comparator.compare(key, unsafeMemoryDMStore.getUnsafeRow(mid));
+ if (compareRes < 0) {
+ high = mid - 1;
+ } else if (compareRes > 0) {
+ low = mid + 1;
+ } else {
+ // if key is matched then get the first entry
+ int currentPos = mid;
+ while (currentPos - 1 >= 0
+ && comparator.compare(key, unsafeMemoryDMStore.getUnsafeRow(currentPos - 1)) == 0) {
+ currentPos--;
+ }
+ mid = currentPos;
+ break;
+ }
+ }
+ // if compare result is less than zero then we
+ // and mid is more than 0 then we need to previous block as duplicates
+ // record can be present
+ if (compareRes < 0) {
+ if (mid > 0) {
+ mid--;
+ }
+ childNodeIndex = mid;
+ } else {
+ childNodeIndex = mid;
+ }
+ // get the leaf child
+ return childNodeIndex;
+ }
+
+ /**
+ * Binary search used to get the last tentative block based on
+ * search key
+ *
+ * @param key search key
+ * @return first tentative block
+ */
+ private int findEndIndex(DataMapRow key, Comparator<DataMapRow> comparator) {
+ int childNodeIndex;
+ int low = 0;
+ int high = unsafeMemoryDMStore.getRowCount() - 1;
+ int mid = 0;
+ int compareRes = -1;
+ //
+ while (low <= high) {
+ mid = (low + high) >>> 1;
+ // compare the entries
+ compareRes = comparator.compare(key, unsafeMemoryDMStore.getUnsafeRow(mid));
+ if (compareRes < 0) {
+ high = mid - 1;
+ } else if (compareRes > 0) {
+ low = mid + 1;
+ } else {
+ int currentPos = mid;
+ // if key is matched then get the first entry
+ while (currentPos + 1 < unsafeMemoryDMStore.getRowCount()
+ && comparator.compare(key, unsafeMemoryDMStore.getUnsafeRow(currentPos + 1)) == 0) {
+ currentPos++;
+ }
+ mid = currentPos;
+ break;
+ }
+ }
+ // if compare result is less than zero then we
+ // and mid is more than 0 then we need to previous block as duplicates
+ // record can be present
+ if (compareRes < 0) {
+ if (mid > 0) {
+ mid--;
+ }
+ childNodeIndex = mid;
+ } else {
+ childNodeIndex = mid;
+ }
+ return childNodeIndex;
+ }
+
+ private DataMapRow convertToRow(IndexKey key) {
+ ByteBuffer buffer =
+ ByteBuffer.allocate(key.getDictionaryKeys().length + key.getNoDictionaryKeys().length + 8);
+ buffer.putInt(key.getDictionaryKeys().length);
+ buffer.putInt(key.getNoDictionaryKeys().length);
+ buffer.put(key.getDictionaryKeys());
+ buffer.put(key.getNoDictionaryKeys());
+ DataMapRowImpl dataMapRow = new DataMapRowImpl(unsafeMemoryDMStore.getSchema());
+ dataMapRow.setByteArray(buffer.array(), 0);
+ return dataMapRow;
+ }
+
+ @Override public void clear() {
+ unsafeMemoryDMStore.freeMemory();
+ unsafeMemoryDMStore = null;
+ segmentProperties = null;
+ }
+
+ @Override public long getFileTimeStamp() {
+ return 0;
+ }
+
+ @Override public int getAccessCount() {
+ return 0;
+ }
+
+ @Override public long getMemorySize() {
+ return unsafeMemoryDMStore.getMemoryUsed();
+ }
+
+ @Override public DataMapDistributable toDistributable() {
+ // TODO
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
new file mode 100644
index 0000000..2fe6643
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore.blockletindex;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.core.cache.Cache;
+import org.apache.carbondata.core.cache.CacheProvider;
+import org.apache.carbondata.core.cache.CacheType;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.events.ChangeEvent;
+import org.apache.carbondata.core.indexstore.DataMap;
+import org.apache.carbondata.core.indexstore.DataMapDistributable;
+import org.apache.carbondata.core.indexstore.DataMapFactory;
+import org.apache.carbondata.core.indexstore.DataMapWriter;
+import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
+import org.apache.carbondata.core.indexstore.schema.FilterType;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+
+/**
+ * Table map for blocklet
+ */
+public class BlockletDataMapFactory implements DataMapFactory {
+
+ private AbsoluteTableIdentifier identifier;
+
+ private Map<String, List<TableBlockIndexUniqueIdentifier>> segmentMap = new HashMap<>();
+
+ private Cache<TableBlockIndexUniqueIdentifier, DataMap> cache;
+
+ public void init(AbsoluteTableIdentifier identifier, String dataMapName) {
+ this.identifier = identifier;
+ cache = CacheProvider.getInstance()
+ .createCache(CacheType.DRIVER_BLOCKLET_DATAMAP, identifier.getStorePath());
+ }
+
+ public DataMapWriter getDataMapWriter(AbsoluteTableIdentifier identifier, String segmentId) {
+ return null;
+ }
+
+ public List<DataMap> getDataMaps(String segmentId) {
+ List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
+ segmentMap.get(segmentId);
+ if (tableBlockIndexUniqueIdentifiers == null) {
+ tableBlockIndexUniqueIdentifiers = new ArrayList<>();
+ String path = identifier.getTablePath() + "/Fact/Part0/Segment_" + segmentId;
+ FileFactory.FileType fileType = FileFactory.getFileType(path);
+ CarbonFile carbonFile = FileFactory.getCarbonFile(path, fileType);
+ CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() {
+ @Override public boolean accept(CarbonFile file) {
+ return file.getName().endsWith(".carbonindex");
+ }
+ });
+ for (int i = 0; i < listFiles.length; i++) {
+ tableBlockIndexUniqueIdentifiers.add(
+ new TableBlockIndexUniqueIdentifier(identifier, segmentId, listFiles[i].getName()));
+ }
+ }
+
+ try {
+ return cache.getAll(tableBlockIndexUniqueIdentifiers);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override public boolean isFiltersSupported(FilterType filterType) {
+ return true;
+ }
+
+ public void clear(String segmentId) {
+ List<TableBlockIndexUniqueIdentifier> blockIndexes = segmentMap.remove(segmentId);
+ if (blockIndexes != null) {
+ for (TableBlockIndexUniqueIdentifier blockIndex : blockIndexes) {
+ DataMap dataMap = cache.getIfPresent(blockIndex);
+ dataMap.clear();
+ cache.invalidate(blockIndex);
+ }
+ }
+ }
+
+ @Override public void clear() {
+ for (String segmentId: segmentMap.keySet()) {
+ clear(segmentId);
+ }
+ }
+
+ @Override public DataMap getDataMap(DataMapDistributable distributable) {
+ return null;
+ }
+
+ @Override public void fireEvent(ChangeEvent event) {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNodeWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNodeWrapper.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNodeWrapper.java
new file mode 100644
index 0000000..5509c75
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNodeWrapper.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore.blockletindex;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.carbondata.core.cache.update.BlockletLevelDeleteDeltaDataCache;
+import org.apache.carbondata.core.datastore.DataRefNode;
+import org.apache.carbondata.core.datastore.FileHolder;
+import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
+import org.apache.carbondata.core.datastore.chunk.reader.CarbonDataReaderFactory;
+import org.apache.carbondata.core.datastore.chunk.reader.DimensionColumnChunkReader;
+import org.apache.carbondata.core.datastore.chunk.reader.MeasureColumnChunkReader;
+import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
+
+/**
+ * wrapper for blocklet data map data
+ */
+public class BlockletDataRefNodeWrapper implements DataRefNode {
+
+ private List<TableBlockInfo> blockInfos;
+
+ private int index;
+
+ private int[] dimensionLens;
+
+ private BlockletLevelDeleteDeltaDataCache deleteDeltaDataCache;
+
+ public BlockletDataRefNodeWrapper(List<TableBlockInfo> blockInfos, int index,
+ int[] dimensionLens) {
+ this.blockInfos = blockInfos;
+ this.index = index;
+ this.dimensionLens = dimensionLens;
+ }
+
+ @Override public DataRefNode getNextDataRefNode() {
+ if (index + 1 < blockInfos.size()) {
+ new BlockletDataRefNodeWrapper(blockInfos, index + 1, dimensionLens);
+ }
+ return null;
+ }
+
+ @Override public int nodeSize() {
+ return blockInfos.get(index).getDetailInfo().getRowCount();
+ }
+
+ @Override public long nodeNumber() {
+ return index;
+ }
+
+ @Override public byte[][] getColumnsMaxValue() {
+ return null;
+ }
+
+ @Override public byte[][] getColumnsMinValue() {
+ return null;
+ }
+
+ @Override
+ public DimensionRawColumnChunk[] getDimensionChunks(FileHolder fileReader, int[][] blockIndexes)
+ throws IOException {
+ DimensionColumnChunkReader dimensionChunksReader = getDimensionColumnChunkReader();
+ return dimensionChunksReader.readRawDimensionChunks(fileReader, blockIndexes);
+ }
+
+ @Override
+ public DimensionRawColumnChunk getDimensionChunk(FileHolder fileReader, int blockIndexes)
+ throws IOException {
+ DimensionColumnChunkReader dimensionChunksReader = getDimensionColumnChunkReader();
+ return dimensionChunksReader.readRawDimensionChunk(fileReader, blockIndexes);
+ }
+
+ @Override
+ public MeasureRawColumnChunk[] getMeasureChunks(FileHolder fileReader, int[][] blockIndexes)
+ throws IOException {
+ MeasureColumnChunkReader measureColumnChunkReader = getMeasureColumnChunkReader();
+ return measureColumnChunkReader.readRawMeasureChunks(fileReader, blockIndexes);
+ }
+
+ @Override public MeasureRawColumnChunk getMeasureChunk(FileHolder fileReader, int blockIndex)
+ throws IOException {
+ MeasureColumnChunkReader measureColumnChunkReader = getMeasureColumnChunkReader();
+ return measureColumnChunkReader.readRawMeasureChunk(fileReader, blockIndex);
+ }
+
+ private DimensionColumnChunkReader getDimensionColumnChunkReader() throws IOException {
+ ColumnarFormatVersion version =
+ ColumnarFormatVersion.valueOf(blockInfos.get(index).getDetailInfo().getVersionNumber());
+ DimensionColumnChunkReader dimensionColumnChunkReader = CarbonDataReaderFactory.getInstance()
+ .getDimensionColumnChunkReader(version,
+ blockInfos.get(index).getDetailInfo().getBlockletInfo(), dimensionLens,
+ blockInfos.get(index).getFilePath());
+ return dimensionColumnChunkReader;
+ }
+
+ private MeasureColumnChunkReader getMeasureColumnChunkReader() throws IOException {
+ ColumnarFormatVersion version =
+ ColumnarFormatVersion.valueOf(blockInfos.get(index).getDetailInfo().getVersionNumber());
+ return CarbonDataReaderFactory.getInstance().getMeasureColumnChunkReader(version,
+ blockInfos.get(index).getDetailInfo().getBlockletInfo(),
+ blockInfos.get(index).getFilePath());
+ }
+
+ @Override
+ public void setDeleteDeltaDataCache(BlockletLevelDeleteDeltaDataCache deleteDeltaDataCache) {
+ this.deleteDeltaDataCache = deleteDeltaDataCache;
+ }
+
+ @Override public BlockletLevelDeleteDeltaDataCache getDeleteDeltaDataCache() {
+ return deleteDeltaDataCache;
+ }
+
+ @Override public int numberOfPages() {
+ return blockInfos.get(index).getDetailInfo().getPagesCount();
+ }
+
+ public int numberOfNodes() {
+ return blockInfos.size();
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/IndexWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/IndexWrapper.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/IndexWrapper.java
new file mode 100644
index 0000000..b8cffc6
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/IndexWrapper.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore.blockletindex;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.carbondata.core.datastore.block.AbstractIndex;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+/**
+ * Wrapper of abstract index
+ * TODO it could be removed after refactor
+ */
+public class IndexWrapper extends AbstractIndex {
+
+ public IndexWrapper(List<TableBlockInfo> blockInfos) {
+ DataFileFooter fileFooter = null;
+ try {
+ fileFooter = CarbonUtil.readMetadatFile(blockInfos.get(0));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ segmentProperties = new SegmentProperties(fileFooter.getColumnInTable(),
+ fileFooter.getSegmentInfo().getColumnCardinality());
+ dataRefNode = new BlockletDataRefNodeWrapper(blockInfos, 0,
+ segmentProperties.getDimensionColumnsValueSize());
+ }
+
+ @Override public void buildIndex(List<DataFileFooter> footerList) {
+ }
+}