You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/07/27 12:42:04 UTC

[4/7] carbondata git commit: [CARBONDATA-1232] Datamap implementation for Blocklet

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
index e4d3ba5..8aa18d5 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
@@ -17,14 +17,31 @@
 
 package org.apache.carbondata.presto.impl;
 
-import com.facebook.presto.spi.SchemaTableName;
-import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.inject.Inject;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.*;
-import org.apache.carbondata.core.datastore.block.*;
+import org.apache.carbondata.core.datastore.DataRefNode;
+import org.apache.carbondata.core.datastore.DataRefNodeFinder;
+import org.apache.carbondata.core.datastore.IndexKey;
+import org.apache.carbondata.core.datastore.SegmentTaskIndexStore;
+import org.apache.carbondata.core.datastore.TableSegmentUniqueIdentifier;
+import org.apache.carbondata.core.datastore.block.AbstractIndex;
+import org.apache.carbondata.core.datastore.block.BlockletInfos;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.datastore.block.SegmentTaskIndexWrapper;
+import org.apache.carbondata.core.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.datastore.exception.IndexBuilderException;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
@@ -52,18 +69,24 @@ import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.hadoop.CacheClient;
 import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
+
+import com.facebook.presto.spi.SchemaTableName;
+import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.inject.Inject;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 import org.apache.thrift.TBase;
 
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
 import static java.util.Objects.requireNonNull;
 import com.facebook.presto.spi.TableNotFoundException;
 
@@ -392,8 +415,9 @@ public class CarbonTableReader {
           TableBlockInfo tableBlockInfo = leafNode.getTableBlockInfo();
 
           if (IUDTable) {
-            if (CarbonUtil.isInvalidTableBlock(tableBlockInfo, invalidBlockVOForSegmentId,
-                updateStatusManager)) {
+            if (CarbonUtil
+                .isInvalidTableBlock(tableBlockInfo.getSegmentId(), tableBlockInfo.getFilePath(),
+                    invalidBlockVOForSegmentId, updateStatusManager)) {
               continue;
             }
           }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
index 277005b..da0d082 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
@@ -29,7 +29,8 @@ import org.apache.spark.sql.execution.command.CarbonMergerMapping
 
 import org.apache.carbondata.core.datastore.block.{Distributable, TableBlockInfo}
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
-import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit, CarbonMultiBlockSplit}
+import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit}
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
 import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
 import org.apache.carbondata.processing.merger.CarbonDataMergerUtil
 import org.apache.carbondata.processing.model.CarbonLoadModel
@@ -71,7 +72,7 @@ class CarbonIUDMergerRDD[K, V](
 
     var blocksOfLastSegment: List[TableBlockInfo] = null
 
-    CarbonInputFormat.setSegmentsToAccess(
+    CarbonTableInputFormat.setSegmentsToAccess(
       job.getConfiguration, carbonMergerMapping.validSegments.toList.asJava)
 
     // get splits

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index 5c0e77d..1a8183c 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -43,8 +43,8 @@ import org.apache.carbondata.core.mutate.UpdateVO
 import org.apache.carbondata.core.scan.result.iterator.RawResultIterator
 import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
-import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit, CarbonMultiBlockSplit}
+import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit}
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
 import org.apache.carbondata.hadoop.util.{CarbonInputFormatUtil, CarbonInputSplitTaskInfo}
 import org.apache.carbondata.processing.merger._
 import org.apache.carbondata.processing.model.CarbonLoadModel
@@ -293,7 +293,7 @@ class CarbonMergerRDD[K, V](
     for (eachSeg <- carbonMergerMapping.validSegments) {
 
       // map for keeping the relation of a task and its blocks.
-      job.getConfiguration.set(CarbonInputFormat.INPUT_SEGMENT_NUMBERS, eachSeg)
+      job.getConfiguration.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, eachSeg)
 
       if (updateStatusManager.getUpdateStatusDetails.length != 0) {
          updateDetails = updateStatusManager.getInvalidTimestampRange(eachSeg)
@@ -315,7 +315,8 @@ class CarbonMergerRDD[K, V](
           updateStatusManager.getDeleteDeltaFilePath(entry.getPath.toString)
         )
         ((!updated) || ((updated) && (!CarbonUtil
-          .isInvalidTableBlock(blockInfo, updateDetails, updateStatusManager))))
+          .isInvalidTableBlock(blockInfo.getSegmentId, blockInfo.getFilePath,
+            updateDetails, updateStatusManager))))
       })
     }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 85c4bc4..c383779 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -39,6 +39,7 @@ import org.apache.carbondata.core.scan.model.QueryModel
 import org.apache.carbondata.core.stats.{QueryStatistic, QueryStatisticsConstants, QueryStatisticsRecorder}
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory}
 import org.apache.carbondata.hadoop._
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
 import org.apache.carbondata.spark.load.CarbonLoaderUtil
 
 
@@ -246,22 +247,23 @@ class CarbonScanRDD(
     iterator.asInstanceOf[Iterator[InternalRow]]
   }
 
-  private def prepareInputFormatForDriver(conf: Configuration): CarbonInputFormat[Object] = {
-    CarbonInputFormat.setTableInfo(conf, tableInfo)
+  private def prepareInputFormatForDriver(conf: Configuration): CarbonTableInputFormat[Object] = {
+    CarbonTableInputFormat.setTableInfo(conf, tableInfo)
     createInputFormat(conf)
   }
 
-  private def prepareInputFormatForExecutor(conf: Configuration): CarbonInputFormat[Object] = {
-    CarbonInputFormat.setCarbonReadSupport(conf, readSupport)
-    CarbonInputFormat.setTableInfo(conf, getTableInfo)
+  private def prepareInputFormatForExecutor(conf: Configuration): CarbonTableInputFormat[Object] = {
+    CarbonTableInputFormat.setCarbonReadSupport(conf, readSupport)
+    CarbonTableInputFormat.setTableInfo(conf, tableInfo)
     createInputFormat(conf)
   }
 
-  private def createInputFormat(conf: Configuration): CarbonInputFormat[Object] = {
-    val format = new CarbonInputFormat[Object]
-    CarbonInputFormat.setTablePath(conf, identifier.appendWithLocalPrefix(identifier.getTablePath))
-    CarbonInputFormat.setFilterPredicates(conf, filterExpression)
-    CarbonInputFormat.setColumnProjection(conf, columnProjection)
+  private def createInputFormat(conf: Configuration): CarbonTableInputFormat[Object] = {
+    val format = new CarbonTableInputFormat[Object]
+    CarbonTableInputFormat.setTablePath(conf,
+      identifier.appendWithLocalPrefix(identifier.getTablePath))
+    CarbonTableInputFormat.setFilterPredicates(conf, filterExpression)
+    CarbonTableInputFormat.setColumnProjection(conf, columnProjection)
     format
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala
index 2ca3b8c..4950227 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala
@@ -26,7 +26,7 @@ import org.apache.hadoop.mapreduce.Job
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
 
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
-import org.apache.carbondata.hadoop.CarbonInputFormat
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
 
 
 /**
@@ -38,8 +38,8 @@ object QueryPlanUtil {
    * createCarbonInputFormat from query model
    */
   def createCarbonInputFormat(absoluteTableIdentifier: AbsoluteTableIdentifier) :
-  (CarbonInputFormat[Array[Object]], Job) = {
-    val carbonInputFormat = new CarbonInputFormat[Array[Object]]()
+  (CarbonTableInputFormat[Array[Object]], Job) = {
+    val carbonInputFormat = new CarbonTableInputFormat[Array[Object]]()
     val jobConf: JobConf = new JobConf(new Configuration)
     val job: Job = new Job(jobConf)
     FileInputFormat.addInputPath(job, new Path(absoluteTableIdentifier.getTablePath))
@@ -47,8 +47,8 @@ object QueryPlanUtil {
   }
 
   def createCarbonInputFormat[V: ClassTag](absoluteTableIdentifier: AbsoluteTableIdentifier,
-      conf: Configuration) : CarbonInputFormat[V] = {
-    val carbonInputFormat = new CarbonInputFormat[V]()
+      conf: Configuration) : CarbonTableInputFormat[V] = {
+    val carbonInputFormat = new CarbonTableInputFormat[V]()
     val job: Job = new Job(conf)
     FileInputFormat.addInputPath(job, new Path(absoluteTableIdentifier.getTablePath))
     carbonInputFormat

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index 71ef6a6..5d931b8 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -38,7 +38,8 @@ import org.apache.spark.util.SerializableConfiguration
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.scan.expression.logical.AndExpression
 import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit, CarbonProjection}
+import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonProjection}
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
 import org.apache.carbondata.hadoop.util.{CarbonInputFormatUtil, SchemaReader}
 import org.apache.carbondata.processing.merger.TableMeta
 import org.apache.carbondata.spark.{CarbonFilters, CarbonOption}
@@ -90,16 +91,17 @@ private[sql] case class CarbonDatasourceHadoopRelation(
     filters.flatMap { filter =>
       CarbonFilters.createCarbonFilter(dataSchema, filter)
     }.reduceOption(new AndExpression(_, _))
-      .foreach(CarbonInputFormat.setFilterPredicates(conf, _))
+      .foreach(CarbonTableInputFormat.setFilterPredicates(conf, _))
 
     val projection = new CarbonProjection
     requiredColumns.foreach(projection.addColumn)
-    CarbonInputFormat.setColumnProjection(conf, projection)
-    CarbonInputFormat.setCarbonReadSupport(conf, classOf[SparkRowReadSupportImpl])
+    CarbonTableInputFormat.setColumnProjection(conf, projection)
+    CarbonTableInputFormat.setCarbonReadSupport(conf, classOf[SparkRowReadSupportImpl])
+
     new CarbonHadoopFSRDD[Row](sqlContext.sparkContext,
       new SerializableConfiguration(conf),
       absIdentifier,
-      classOf[CarbonInputFormat[Row]],
+      classOf[CarbonTableInputFormat[Row]],
       classOf[Row]
     )
   }
@@ -119,7 +121,7 @@ class CarbonHadoopFSRDD[V: ClassTag](
   @transient sc: SparkContext,
   conf: SerializableConfiguration,
   identifier: AbsoluteTableIdentifier,
-  inputFormatClass: Class[_ <: CarbonInputFormat[V]],
+  inputFormatClass: Class[_ <: CarbonTableInputFormat[V]],
   valueClass: Class[V])
   extends CarbonRDD[V](sc, Nil) with SparkHadoopMapReduceUtil {
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
index a292cde..c38f0e1 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
@@ -133,13 +133,6 @@ private[sql] case class ProjectForUpdateCommand(
 
   override def run(sqlContext: SQLContext): Seq[Row] = {
 
-
-   //  sqlContext.sparkContext.setLocalProperty(org.apache.spark.sql.execution.SQLExecution
-    //  .EXECUTION_ID_KEY, null)
-    // DataFrame(sqlContext, plan).show(truncate = false)
-    // return Seq.empty
-
-
     val res = plan find {
       case relation: LogicalRelation if (relation.relation
         .isInstanceOf[CarbonDatasourceRelation]) =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala
index 70c7caf..e0a8b58 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala
@@ -26,7 +26,7 @@ import org.apache.hadoop.mapreduce.Job
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
 
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
-import org.apache.carbondata.hadoop.CarbonInputFormat
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
 
 /**
  * All the utility functions for carbon plan creation
@@ -37,8 +37,8 @@ object QueryPlanUtil {
    * createCarbonInputFormat from query model
    */
   def createCarbonInputFormat(absoluteTableIdentifier: AbsoluteTableIdentifier) :
-  (CarbonInputFormat[Array[Object]], Job) = {
-    val carbonInputFormat = new CarbonInputFormat[Array[Object]]()
+  (CarbonTableInputFormat[Array[Object]], Job) = {
+    val carbonInputFormat = new CarbonTableInputFormat[Array[Object]]()
     val jobConf: JobConf = new JobConf(new Configuration)
     val job: Job = new Job(jobConf)
     FileInputFormat.addInputPath(job, new Path(absoluteTableIdentifier.getTablePath))
@@ -46,8 +46,8 @@ object QueryPlanUtil {
   }
 
   def createCarbonInputFormat[V: ClassTag](absoluteTableIdentifier: AbsoluteTableIdentifier,
-      conf: Configuration) : CarbonInputFormat[V] = {
-    val carbonInputFormat = new CarbonInputFormat[V]()
+      conf: Configuration) : CarbonTableInputFormat[V] = {
+    val carbonInputFormat = new CarbonTableInputFormat[V]()
     val job: Job = new Job(conf)
     FileInputFormat.addInputPath(job, new Path(absoluteTableIdentifier.getTablePath))
     carbonInputFormat

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
index 1788ccb..08b8600 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
@@ -18,6 +18,7 @@ package org.apache.carbondata.processing.merger;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -378,4 +379,35 @@ public class CarbonCompactionUtil {
     }
     return restructuredBlockExists;
   }
+
+  /**
+   * This method will check for any restructured block in the blocks selected for compaction
+   *
+   * @param segmentMapping
+   * @param tableLastUpdatedTime
+   * @return
+   */
+  public static boolean checkIfAnyRestructuredBlockExists(Map<String, TaskBlockInfo> segmentMapping,
+      long tableLastUpdatedTime) {
+    boolean restructuredBlockExists = false;
+    for (Map.Entry<String, TaskBlockInfo> taskMap : segmentMapping.entrySet()) {
+      String segmentId = taskMap.getKey();
+      TaskBlockInfo taskBlockInfo = taskMap.getValue();
+      Collection<List<TableBlockInfo>> infoList = taskBlockInfo.getAllTableBlockInfoList();
+      for (List<TableBlockInfo> listMetadata : infoList) {
+        for (TableBlockInfo blockInfo : listMetadata) {
+          // if schema modified timestamp is greater than footer stored schema timestamp,
+          // it indicates it is a restructured block
+          if (tableLastUpdatedTime > blockInfo.getDetailInfo().getSchemaUpdatedTimeStamp()) {
+            restructuredBlockExists = true;
+            break;
+          }
+        }
+      }
+      if (restructuredBlockExists) {
+        break;
+      }
+    }
+    return restructuredBlockExists;
+  }
 }