You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/07/27 12:42:04 UTC
[4/7] carbondata git commit: [CARBONDATA-1232] Datamap implementation
for Blocklet
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
index e4d3ba5..8aa18d5 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
@@ -17,14 +17,31 @@
package org.apache.carbondata.presto.impl;
-import com.facebook.presto.spi.SchemaTableName;
-import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.inject.Inject;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.*;
-import org.apache.carbondata.core.datastore.block.*;
+import org.apache.carbondata.core.datastore.DataRefNode;
+import org.apache.carbondata.core.datastore.DataRefNodeFinder;
+import org.apache.carbondata.core.datastore.IndexKey;
+import org.apache.carbondata.core.datastore.SegmentTaskIndexStore;
+import org.apache.carbondata.core.datastore.TableSegmentUniqueIdentifier;
+import org.apache.carbondata.core.datastore.block.AbstractIndex;
+import org.apache.carbondata.core.datastore.block.BlockletInfos;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.datastore.block.SegmentTaskIndexWrapper;
+import org.apache.carbondata.core.datastore.block.TableBlockInfo;
import org.apache.carbondata.core.datastore.exception.IndexBuilderException;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.impl.FileFactory;
@@ -52,18 +69,24 @@ import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.hadoop.CacheClient;
import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
+
+import com.facebook.presto.spi.SchemaTableName;
+import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.inject.Inject;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.thrift.TBase;
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
import static java.util.Objects.requireNonNull;
import com.facebook.presto.spi.TableNotFoundException;
@@ -392,8 +415,9 @@ public class CarbonTableReader {
TableBlockInfo tableBlockInfo = leafNode.getTableBlockInfo();
if (IUDTable) {
- if (CarbonUtil.isInvalidTableBlock(tableBlockInfo, invalidBlockVOForSegmentId,
- updateStatusManager)) {
+ if (CarbonUtil
+ .isInvalidTableBlock(tableBlockInfo.getSegmentId(), tableBlockInfo.getFilePath(),
+ invalidBlockVOForSegmentId, updateStatusManager)) {
continue;
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
index 277005b..da0d082 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
@@ -29,7 +29,8 @@ import org.apache.spark.sql.execution.command.CarbonMergerMapping
import org.apache.carbondata.core.datastore.block.{Distributable, TableBlockInfo}
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
-import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit, CarbonMultiBlockSplit}
+import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit}
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
import org.apache.carbondata.processing.merger.CarbonDataMergerUtil
import org.apache.carbondata.processing.model.CarbonLoadModel
@@ -71,7 +72,7 @@ class CarbonIUDMergerRDD[K, V](
var blocksOfLastSegment: List[TableBlockInfo] = null
- CarbonInputFormat.setSegmentsToAccess(
+ CarbonTableInputFormat.setSegmentsToAccess(
job.getConfiguration, carbonMergerMapping.validSegments.toList.asJava)
// get splits
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index 5c0e77d..1a8183c 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -43,8 +43,8 @@ import org.apache.carbondata.core.mutate.UpdateVO
import org.apache.carbondata.core.scan.result.iterator.RawResultIterator
import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
-import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit, CarbonMultiBlockSplit}
+import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit}
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
import org.apache.carbondata.hadoop.util.{CarbonInputFormatUtil, CarbonInputSplitTaskInfo}
import org.apache.carbondata.processing.merger._
import org.apache.carbondata.processing.model.CarbonLoadModel
@@ -293,7 +293,7 @@ class CarbonMergerRDD[K, V](
for (eachSeg <- carbonMergerMapping.validSegments) {
// map for keeping the relation of a task and its blocks.
- job.getConfiguration.set(CarbonInputFormat.INPUT_SEGMENT_NUMBERS, eachSeg)
+ job.getConfiguration.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, eachSeg)
if (updateStatusManager.getUpdateStatusDetails.length != 0) {
updateDetails = updateStatusManager.getInvalidTimestampRange(eachSeg)
@@ -315,7 +315,8 @@ class CarbonMergerRDD[K, V](
updateStatusManager.getDeleteDeltaFilePath(entry.getPath.toString)
)
((!updated) || ((updated) && (!CarbonUtil
- .isInvalidTableBlock(blockInfo, updateDetails, updateStatusManager))))
+ .isInvalidTableBlock(blockInfo.getSegmentId, blockInfo.getFilePath,
+ updateDetails, updateStatusManager))))
})
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 85c4bc4..c383779 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -39,6 +39,7 @@ import org.apache.carbondata.core.scan.model.QueryModel
import org.apache.carbondata.core.stats.{QueryStatistic, QueryStatisticsConstants, QueryStatisticsRecorder}
import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory}
import org.apache.carbondata.hadoop._
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
import org.apache.carbondata.spark.load.CarbonLoaderUtil
@@ -246,22 +247,23 @@ class CarbonScanRDD(
iterator.asInstanceOf[Iterator[InternalRow]]
}
- private def prepareInputFormatForDriver(conf: Configuration): CarbonInputFormat[Object] = {
- CarbonInputFormat.setTableInfo(conf, tableInfo)
+ private def prepareInputFormatForDriver(conf: Configuration): CarbonTableInputFormat[Object] = {
+ CarbonTableInputFormat.setTableInfo(conf, tableInfo)
createInputFormat(conf)
}
- private def prepareInputFormatForExecutor(conf: Configuration): CarbonInputFormat[Object] = {
- CarbonInputFormat.setCarbonReadSupport(conf, readSupport)
- CarbonInputFormat.setTableInfo(conf, getTableInfo)
+ private def prepareInputFormatForExecutor(conf: Configuration): CarbonTableInputFormat[Object] = {
+ CarbonTableInputFormat.setCarbonReadSupport(conf, readSupport)
+ CarbonTableInputFormat.setTableInfo(conf, tableInfo)
createInputFormat(conf)
}
- private def createInputFormat(conf: Configuration): CarbonInputFormat[Object] = {
- val format = new CarbonInputFormat[Object]
- CarbonInputFormat.setTablePath(conf, identifier.appendWithLocalPrefix(identifier.getTablePath))
- CarbonInputFormat.setFilterPredicates(conf, filterExpression)
- CarbonInputFormat.setColumnProjection(conf, columnProjection)
+ private def createInputFormat(conf: Configuration): CarbonTableInputFormat[Object] = {
+ val format = new CarbonTableInputFormat[Object]
+ CarbonTableInputFormat.setTablePath(conf,
+ identifier.appendWithLocalPrefix(identifier.getTablePath))
+ CarbonTableInputFormat.setFilterPredicates(conf, filterExpression)
+ CarbonTableInputFormat.setColumnProjection(conf, columnProjection)
format
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala
index 2ca3b8c..4950227 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala
@@ -26,7 +26,7 @@ import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
-import org.apache.carbondata.hadoop.CarbonInputFormat
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
/**
@@ -38,8 +38,8 @@ object QueryPlanUtil {
* createCarbonInputFormat from query model
*/
def createCarbonInputFormat(absoluteTableIdentifier: AbsoluteTableIdentifier) :
- (CarbonInputFormat[Array[Object]], Job) = {
- val carbonInputFormat = new CarbonInputFormat[Array[Object]]()
+ (CarbonTableInputFormat[Array[Object]], Job) = {
+ val carbonInputFormat = new CarbonTableInputFormat[Array[Object]]()
val jobConf: JobConf = new JobConf(new Configuration)
val job: Job = new Job(jobConf)
FileInputFormat.addInputPath(job, new Path(absoluteTableIdentifier.getTablePath))
@@ -47,8 +47,8 @@ object QueryPlanUtil {
}
def createCarbonInputFormat[V: ClassTag](absoluteTableIdentifier: AbsoluteTableIdentifier,
- conf: Configuration) : CarbonInputFormat[V] = {
- val carbonInputFormat = new CarbonInputFormat[V]()
+ conf: Configuration) : CarbonTableInputFormat[V] = {
+ val carbonInputFormat = new CarbonTableInputFormat[V]()
val job: Job = new Job(conf)
FileInputFormat.addInputPath(job, new Path(absoluteTableIdentifier.getTablePath))
carbonInputFormat
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index 71ef6a6..5d931b8 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -38,7 +38,8 @@ import org.apache.spark.util.SerializableConfiguration
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.scan.expression.logical.AndExpression
import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit, CarbonProjection}
+import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonProjection}
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
import org.apache.carbondata.hadoop.util.{CarbonInputFormatUtil, SchemaReader}
import org.apache.carbondata.processing.merger.TableMeta
import org.apache.carbondata.spark.{CarbonFilters, CarbonOption}
@@ -90,16 +91,17 @@ private[sql] case class CarbonDatasourceHadoopRelation(
filters.flatMap { filter =>
CarbonFilters.createCarbonFilter(dataSchema, filter)
}.reduceOption(new AndExpression(_, _))
- .foreach(CarbonInputFormat.setFilterPredicates(conf, _))
+ .foreach(CarbonTableInputFormat.setFilterPredicates(conf, _))
val projection = new CarbonProjection
requiredColumns.foreach(projection.addColumn)
- CarbonInputFormat.setColumnProjection(conf, projection)
- CarbonInputFormat.setCarbonReadSupport(conf, classOf[SparkRowReadSupportImpl])
+ CarbonTableInputFormat.setColumnProjection(conf, projection)
+ CarbonTableInputFormat.setCarbonReadSupport(conf, classOf[SparkRowReadSupportImpl])
+
new CarbonHadoopFSRDD[Row](sqlContext.sparkContext,
new SerializableConfiguration(conf),
absIdentifier,
- classOf[CarbonInputFormat[Row]],
+ classOf[CarbonTableInputFormat[Row]],
classOf[Row]
)
}
@@ -119,7 +121,7 @@ class CarbonHadoopFSRDD[V: ClassTag](
@transient sc: SparkContext,
conf: SerializableConfiguration,
identifier: AbsoluteTableIdentifier,
- inputFormatClass: Class[_ <: CarbonInputFormat[V]],
+ inputFormatClass: Class[_ <: CarbonTableInputFormat[V]],
valueClass: Class[V])
extends CarbonRDD[V](sc, Nil) with SparkHadoopMapReduceUtil {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
index a292cde..c38f0e1 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
@@ -133,13 +133,6 @@ private[sql] case class ProjectForUpdateCommand(
override def run(sqlContext: SQLContext): Seq[Row] = {
-
- // sqlContext.sparkContext.setLocalProperty(org.apache.spark.sql.execution.SQLExecution
- // .EXECUTION_ID_KEY, null)
- // DataFrame(sqlContext, plan).show(truncate = false)
- // return Seq.empty
-
-
val res = plan find {
case relation: LogicalRelation if (relation.relation
.isInstanceOf[CarbonDatasourceRelation]) =>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala
index 70c7caf..e0a8b58 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala
@@ -26,7 +26,7 @@ import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
-import org.apache.carbondata.hadoop.CarbonInputFormat
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
/**
* All the utility functions for carbon plan creation
@@ -37,8 +37,8 @@ object QueryPlanUtil {
* createCarbonInputFormat from query model
*/
def createCarbonInputFormat(absoluteTableIdentifier: AbsoluteTableIdentifier) :
- (CarbonInputFormat[Array[Object]], Job) = {
- val carbonInputFormat = new CarbonInputFormat[Array[Object]]()
+ (CarbonTableInputFormat[Array[Object]], Job) = {
+ val carbonInputFormat = new CarbonTableInputFormat[Array[Object]]()
val jobConf: JobConf = new JobConf(new Configuration)
val job: Job = new Job(jobConf)
FileInputFormat.addInputPath(job, new Path(absoluteTableIdentifier.getTablePath))
@@ -46,8 +46,8 @@ object QueryPlanUtil {
}
def createCarbonInputFormat[V: ClassTag](absoluteTableIdentifier: AbsoluteTableIdentifier,
- conf: Configuration) : CarbonInputFormat[V] = {
- val carbonInputFormat = new CarbonInputFormat[V]()
+ conf: Configuration) : CarbonTableInputFormat[V] = {
+ val carbonInputFormat = new CarbonTableInputFormat[V]()
val job: Job = new Job(conf)
FileInputFormat.addInputPath(job, new Path(absoluteTableIdentifier.getTablePath))
carbonInputFormat
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
index 1788ccb..08b8600 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
@@ -18,6 +18,7 @@ package org.apache.carbondata.processing.merger;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -378,4 +379,35 @@ public class CarbonCompactionUtil {
}
return restructuredBlockExists;
}
+
+ /**
+ * This method will check for any restructured block in the blocks selected for compaction
+ *
+ * @param segmentMapping
+ * @param tableLastUpdatedTime
+ * @return
+ */
+ public static boolean checkIfAnyRestructuredBlockExists(Map<String, TaskBlockInfo> segmentMapping,
+ long tableLastUpdatedTime) {
+ boolean restructuredBlockExists = false;
+ for (Map.Entry<String, TaskBlockInfo> taskMap : segmentMapping.entrySet()) {
+ String segmentId = taskMap.getKey();
+ TaskBlockInfo taskBlockInfo = taskMap.getValue();
+ Collection<List<TableBlockInfo>> infoList = taskBlockInfo.getAllTableBlockInfoList();
+ for (List<TableBlockInfo> listMetadata : infoList) {
+ for (TableBlockInfo blockInfo : listMetadata) {
+ // if schema modified timestamp is greater than footer stored schema timestamp,
+ // it indicates it is a restructured block
+ if (tableLastUpdatedTime > blockInfo.getDetailInfo().getSchemaUpdatedTimeStamp()) {
+ restructuredBlockExists = true;
+ break;
+ }
+ }
+ }
+ if (restructuredBlockExists) {
+ break;
+ }
+ }
+ return restructuredBlockExists;
+ }
}