You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/11/18 15:29:56 UTC

[22/28] carbondata git commit: [CARBONDATA-1614][Streaming] Show file format for segment

[CARBONDATA-1614][Streaming] Show file format for segment

This closes #1498


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/ee71610e
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/ee71610e
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/ee71610e

Branch: refs/heads/fgdatamap
Commit: ee71610e1c7686117f3feebab75fdeb82dc31d54
Parents: 91355ef
Author: Jacky Li <ja...@qq.com>
Authored: Wed Nov 15 00:05:03 2017 +0800
Committer: ravipesala <ra...@gmail.com>
Committed: Sat Nov 18 00:28:25 2017 +0530

----------------------------------------------------------------------
 .../carbondata/core/statusmanager/FileFormat.java  | 17 +++++++++++------
 .../core/statusmanager/LoadMetadataDetails.java    |  2 +-
 .../apache/carbondata/hadoop/CarbonInputSplit.java |  2 +-
 .../carbondata/hadoop/CarbonMultiBlockSplit.java   |  2 +-
 .../hadoop/api/CarbonTableInputFormat.java         |  6 +++---
 .../streaming/CarbonStreamInputFormatTest.java     |  2 +-
 .../org/apache/carbondata/api/CarbonStore.scala    |  3 ++-
 .../carbondata/spark/rdd/CarbonMergerRDD.scala     | 12 ++++++------
 .../carbondata/spark/rdd/CarbonScanRDD.scala       |  6 +++---
 .../apache/carbondata/spark/util/CommonUtil.scala  |  4 ++--
 .../apache/spark/sql/CarbonCatalystOperators.scala |  1 +
 .../segmentreading/TestSegmentReading.scala        |  2 +-
 .../carbondata/TestStreamingTableOperation.scala   |  7 ++++---
 .../streaming/segment/StreamSegment.java           |  6 +++---
 14 files changed, 40 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/ee71610e/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java
index 83a4813..c154c5f 100644
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java
@@ -18,23 +18,28 @@
 package org.apache.carbondata.core.statusmanager;
 
 /**
- * the data file format which was supported
+ * The data file format supported in carbondata project
  */
 public enum FileFormat {
-  carbondata, rowformat;
+
+  // carbondata columnar file format, optimized for read
+  COLUMNAR_V3,
+
+  // carbondata row file format, optimized for write
+  ROW_V1;
 
   public static FileFormat getByOrdinal(int ordinal) {
     if (ordinal < 0 || ordinal >= FileFormat.values().length) {
-      return carbondata;
+      return COLUMNAR_V3;
     }
 
     switch (ordinal) {
       case 0:
-        return carbondata;
+        return COLUMNAR_V3;
       case 1:
-        return rowformat;
+        return ROW_V1;
     }
 
-    return carbondata;
+    return COLUMNAR_V3;
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ee71610e/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
index b282d53..bb7fc9d 100644
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
@@ -98,7 +98,7 @@ public class LoadMetadataDetails implements Serializable {
   /**
    * the file format of this segment
    */
-  private FileFormat fileFormat = FileFormat.carbondata;
+  private FileFormat fileFormat = FileFormat.COLUMNAR_V3;
 
   public String getPartitionCount() {
     return partitionCount;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ee71610e/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
index f7b372f..e89c2d6 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
@@ -82,7 +82,7 @@ public class CarbonInputSplit extends FileSplit
 
   private BlockletDetailInfo detailInfo;
 
-  private FileFormat fileFormat = FileFormat.carbondata;
+  private FileFormat fileFormat = FileFormat.COLUMNAR_V3;
 
   public CarbonInputSplit() {
     segmentId = null;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ee71610e/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java
index d3fa2c2..96fe909 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java
@@ -45,7 +45,7 @@ public class CarbonMultiBlockSplit extends InputSplit implements Writable {
    */
   private String[] locations;
 
-  private FileFormat fileFormat = FileFormat.carbondata;
+  private FileFormat fileFormat = FileFormat.COLUMNAR_V3;
 
   public CarbonMultiBlockSplit() {
     splitList = null;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ee71610e/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 552455a..8bf779e 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -475,19 +475,19 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
                     int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
                     splits.add(makeSplit(segmentId, path, length - bytesRemaining, splitSize,
                         blkLocations[blkIndex].getHosts(),
-                        blkLocations[blkIndex].getCachedHosts(), FileFormat.rowformat));
+                        blkLocations[blkIndex].getCachedHosts(), FileFormat.ROW_V1));
                     bytesRemaining -= splitSize;
                   }
                   if (bytesRemaining != 0) {
                     int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
                     splits.add(makeSplit(segmentId, path, length - bytesRemaining, bytesRemaining,
                         blkLocations[blkIndex].getHosts(),
-                        blkLocations[blkIndex].getCachedHosts(), FileFormat.rowformat));
+                        blkLocations[blkIndex].getCachedHosts(), FileFormat.ROW_V1));
                   }
                 } else {
                   //Create empty hosts array for zero length files
                   splits.add(makeSplit(segmentId, path, 0, length, new String[0],
-                      FileFormat.rowformat));
+                      FileFormat.ROW_V1));
                 }
               }
             } finally {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ee71610e/hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormatTest.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormatTest.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormatTest.java
index 9970c50..4f81518 100644
--- a/hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormatTest.java
+++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormatTest.java
@@ -73,7 +73,7 @@ public class CarbonStreamInputFormatTest extends TestCase {
     List<CarbonInputSplit> splitList = new ArrayList<>();
     splitList.add(carbonInputSplit);
     return new CarbonMultiBlockSplit(identifier, splitList, new String[] { "localhost" },
-        FileFormat.rowformat);
+        FileFormat.ROW_V1);
   }
 
   @Test public void testCreateRecordReader() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ee71610e/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
index a2c9c6d..6c2490e 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
@@ -29,7 +29,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier}
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
@@ -90,6 +90,7 @@ object CarbonStore {
             load.getSegmentStatus.getMessage,
             startTime,
             endTime,
+            load.getFileFormat.toString,
             mergedTo)
         }.toSeq
     } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ee71610e/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index 2a7ca47..aaeedb4 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -306,10 +306,10 @@ class CarbonMergerRDD[K, V](
       val splits = format.getSplits(job)
 
       // keep on assigning till last one is reached.
-      if (null != splits && splits.size > 0) {
-        splitsOfLastSegment = splits.asScala.map(_.asInstanceOf[CarbonInputSplit])
-          .filter { split => FileFormat.carbondata.equals(split.getFileFormat) }.toList.asJava
-      }
+      if (null != splits && splits.size > 0) splitsOfLastSegment =
+        splits.asScala
+          .map(_.asInstanceOf[CarbonInputSplit])
+          .filter { split => FileFormat.COLUMNAR_V3.equals(split.getFileFormat) }.toList.asJava
 
       carbonInputSplits ++:= splits.asScala.map(_.asInstanceOf[CarbonInputSplit]).filter(entry => {
         val blockInfo = new TableBlockInfo(entry.getPath.toString,
@@ -317,10 +317,10 @@ class CarbonMergerRDD[K, V](
           entry.getLocations, entry.getLength, entry.getVersion,
           updateStatusManager.getDeleteDeltaFilePath(entry.getPath.toString)
         )
-        (((!updated) || ((updated) && (!CarbonUtil
+        ((!updated) || (updated && (!CarbonUtil
           .isInvalidTableBlock(blockInfo.getSegmentId, blockInfo.getFilePath,
             updateDetails, updateStatusManager)))) &&
-         FileFormat.carbondata.equals(entry.getFileFormat))
+        FileFormat.COLUMNAR_V3.equals(entry.getFileFormat)
       })
     }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ee71610e/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index a84b040..b24562c 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -94,7 +94,7 @@ class CarbonScanRDD(
     val streamSplits = new ArrayBuffer[InputSplit]()
     splits.asScala.foreach { split =>
       val carbonInputSplit = split.asInstanceOf[CarbonInputSplit]
-      if (FileFormat.rowformat == carbonInputSplit.getFileFormat) {
+      if (FileFormat.ROW_V1 == carbonInputSplit.getFileFormat) {
         streamSplits += split
       } else {
         columnarSplits.add(split)
@@ -111,7 +111,7 @@ class CarbonScanRDD(
             new CarbonMultiBlockSplit(identifier,
               Seq(splitWithIndex._1.asInstanceOf[CarbonInputSplit]).asJava,
               splitWithIndex._1.getLocations,
-              FileFormat.rowformat)
+              FileFormat.ROW_V1)
           new CarbonSparkPartition(id, splitWithIndex._2 + index, multiBlockSplit)
         }
       if (batchPartitions.isEmpty) {
@@ -250,7 +250,7 @@ class CarbonScanRDD(
       val model = format.getQueryModel(inputSplit, attemptContext)
       // get RecordReader by FileFormat
       val reader: RecordReader[Void, Object] = inputSplit.getFileFormat match {
-        case FileFormat.rowformat =>
+        case FileFormat.ROW_V1 =>
           // create record reader for row format
           DataTypeUtil.setDataTypeConverter(new SparkDataTypeConverterImpl)
           val inputFormat = new CarbonStreamInputFormat

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ee71610e/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index f0b33f4..a922a07 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -859,14 +859,14 @@ object CommonUtil {
           CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT,
           CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT).toBoolean) {
           new CarbonMergeFilesRDD(sparkContext, AbsoluteTableIdentifier.from(tablePath,
-            carbonTable.getDatabaseName, carbonTable.getFactTableName).getTablePath,
+            carbonTable.getDatabaseName, carbonTable.getTableName).getTablePath,
             segmentIds).collect()
         }
       } catch {
         case _: Exception =>
           if (CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT.toBoolean) {
             new CarbonMergeFilesRDD(sparkContext, AbsoluteTableIdentifier.from(tablePath,
-              carbonTable.getDatabaseName, carbonTable.getFactTableName).getTablePath,
+              carbonTable.getDatabaseName, carbonTable.getTableName).getTablePath,
               segmentIds).collect()
           }
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ee71610e/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
index 62632df..f8a5404 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
@@ -123,6 +123,7 @@ case class ShowLoadsCommand(
       AttributeReference("Status", StringType, nullable = false)(),
       AttributeReference("Load Start Time", TimestampType, nullable = false)(),
       AttributeReference("Load End Time", TimestampType, nullable = true)(),
+      AttributeReference("File Format", StringType, nullable = false)(),
       AttributeReference("Merged To", StringType, nullable = false)())
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ee71610e/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala
index ac3fa5c..b23ba2c 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala
@@ -232,7 +232,7 @@ class TestSegmentReading extends QueryTest with BeforeAndAfterAll {
             |('DELIMITER'= ',', 'QUOTECHAR'= '\"')""".stripMargin)
       val df = sql("SHOW SEGMENTS for table carbon_table_show_seg")
       val col = df.collect().map{
-        row => Row(row.getString(0),row.getString(1),row.getString(4))
+        row => Row(row.getString(0),row.getString(1),row.getString(5))
       }.toSeq
       assert(col.equals(Seq(Row("2","Success","NA"),
         Row("1","Compacted","0.1"),

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ee71610e/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
index b29cca4..33aa2c9 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
@@ -32,8 +32,7 @@ import org.apache.spark.sql.test.util.QueryTest
 import org.apache.spark.sql.types.StructType
 import org.scalatest.BeforeAndAfterAll
 
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.statusmanager.SegmentStatus
+import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentStatus}
 import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
 import org.apache.carbondata.hadoop.streaming.CarbonStreamOutputFormat
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
@@ -561,11 +560,13 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
     }
 
     sql("alter table streaming.stream_table_compact compact 'minor'")
+    sql("show segments for table streaming.stream_table_compact").show
 
     val result = sql("show segments for table streaming.stream_table_compact").collect()
     result.foreach { row =>
       if (row.getString(0).equals("1")) {
         assertResult(SegmentStatus.STREAMING.getMessage)(row.getString(1))
+        assertResult(FileFormat.ROW_V1.toString)(row.getString(4))
       }
     }
   }
@@ -583,7 +584,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
       badRecordAction = "force",
       handoffSize = 1024L * 200
     )
-    assert(sql("show segments for table streaming.stream_table_new").count() == 4)
+    assert(sql("show segments for table streaming.stream_table_new").count() > 1)
 
     checkAnswer(
       sql("select count(*) from streaming.stream_table_new"),

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ee71610e/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
index 7682437..0187597 100644
--- a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
@@ -73,7 +73,7 @@ public class StreamSegment {
             SegmentStatusManager.readLoadMetadata(tablePath.getMetadataDirectoryPath());
         LoadMetadataDetails streamSegment = null;
         for (LoadMetadataDetails detail : details) {
-          if (FileFormat.rowformat == detail.getFileFormat()) {
+          if (FileFormat.ROW_V1 == detail.getFileFormat()) {
             if (SegmentStatus.STREAMING == detail.getSegmentStatus()) {
               streamSegment = detail;
               break;
@@ -85,7 +85,7 @@ public class StreamSegment {
           LoadMetadataDetails newDetail = new LoadMetadataDetails();
           newDetail.setPartitionCount("0");
           newDetail.setLoadName("" + segmentId);
-          newDetail.setFileFormat(FileFormat.rowformat);
+          newDetail.setFileFormat(FileFormat.ROW_V1);
           newDetail.setLoadStartTime(System.currentTimeMillis());
           newDetail.setSegmentStatus(SegmentStatus.STREAMING);
 
@@ -149,7 +149,7 @@ public class StreamSegment {
         LoadMetadataDetails newDetail = new LoadMetadataDetails();
         newDetail.setPartitionCount("0");
         newDetail.setLoadName("" + newSegmentId);
-        newDetail.setFileFormat(FileFormat.rowformat);
+        newDetail.setFileFormat(FileFormat.ROW_V1);
         newDetail.setLoadStartTime(System.currentTimeMillis());
         newDetail.setSegmentStatus(SegmentStatus.STREAMING);