You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/11/18 15:29:56 UTC
[22/28] carbondata git commit: [CARBONDATA-1614][Streaming] Show file
format for segment
[CARBONDATA-1614][Streaming] Show file format for segment
This closes #1498
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/ee71610e
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/ee71610e
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/ee71610e
Branch: refs/heads/fgdatamap
Commit: ee71610e1c7686117f3feebab75fdeb82dc31d54
Parents: 91355ef
Author: Jacky Li <ja...@qq.com>
Authored: Wed Nov 15 00:05:03 2017 +0800
Committer: ravipesala <ra...@gmail.com>
Committed: Sat Nov 18 00:28:25 2017 +0530
----------------------------------------------------------------------
.../carbondata/core/statusmanager/FileFormat.java | 17 +++++++++++------
.../core/statusmanager/LoadMetadataDetails.java | 2 +-
.../apache/carbondata/hadoop/CarbonInputSplit.java | 2 +-
.../carbondata/hadoop/CarbonMultiBlockSplit.java | 2 +-
.../hadoop/api/CarbonTableInputFormat.java | 6 +++---
.../streaming/CarbonStreamInputFormatTest.java | 2 +-
.../org/apache/carbondata/api/CarbonStore.scala | 3 ++-
.../carbondata/spark/rdd/CarbonMergerRDD.scala | 12 ++++++------
.../carbondata/spark/rdd/CarbonScanRDD.scala | 6 +++---
.../apache/carbondata/spark/util/CommonUtil.scala | 4 ++--
.../apache/spark/sql/CarbonCatalystOperators.scala | 1 +
.../segmentreading/TestSegmentReading.scala | 2 +-
.../carbondata/TestStreamingTableOperation.scala | 7 ++++---
.../streaming/segment/StreamSegment.java | 6 +++---
14 files changed, 40 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ee71610e/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java
index 83a4813..c154c5f 100644
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java
@@ -18,23 +18,28 @@
package org.apache.carbondata.core.statusmanager;
/**
- * the data file format which was supported
+ * The data file format supported in carbondata project
*/
public enum FileFormat {
- carbondata, rowformat;
+
+ // carbondata columnar file format, optimized for read
+ COLUMNAR_V3,
+
+ // carbondata row file format, optimized for write
+ ROW_V1;
public static FileFormat getByOrdinal(int ordinal) {
if (ordinal < 0 || ordinal >= FileFormat.values().length) {
- return carbondata;
+ return COLUMNAR_V3;
}
switch (ordinal) {
case 0:
- return carbondata;
+ return COLUMNAR_V3;
case 1:
- return rowformat;
+ return ROW_V1;
}
- return carbondata;
+ return COLUMNAR_V3;
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ee71610e/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
index b282d53..bb7fc9d 100644
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
@@ -98,7 +98,7 @@ public class LoadMetadataDetails implements Serializable {
/**
* the file format of this segment
*/
- private FileFormat fileFormat = FileFormat.carbondata;
+ private FileFormat fileFormat = FileFormat.COLUMNAR_V3;
public String getPartitionCount() {
return partitionCount;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ee71610e/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
index f7b372f..e89c2d6 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
@@ -82,7 +82,7 @@ public class CarbonInputSplit extends FileSplit
private BlockletDetailInfo detailInfo;
- private FileFormat fileFormat = FileFormat.carbondata;
+ private FileFormat fileFormat = FileFormat.COLUMNAR_V3;
public CarbonInputSplit() {
segmentId = null;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ee71610e/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java
index d3fa2c2..96fe909 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java
@@ -45,7 +45,7 @@ public class CarbonMultiBlockSplit extends InputSplit implements Writable {
*/
private String[] locations;
- private FileFormat fileFormat = FileFormat.carbondata;
+ private FileFormat fileFormat = FileFormat.COLUMNAR_V3;
public CarbonMultiBlockSplit() {
splitList = null;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ee71610e/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 552455a..8bf779e 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -475,19 +475,19 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
splits.add(makeSplit(segmentId, path, length - bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
- blkLocations[blkIndex].getCachedHosts(), FileFormat.rowformat));
+ blkLocations[blkIndex].getCachedHosts(), FileFormat.ROW_V1));
bytesRemaining -= splitSize;
}
if (bytesRemaining != 0) {
int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
splits.add(makeSplit(segmentId, path, length - bytesRemaining, bytesRemaining,
blkLocations[blkIndex].getHosts(),
- blkLocations[blkIndex].getCachedHosts(), FileFormat.rowformat));
+ blkLocations[blkIndex].getCachedHosts(), FileFormat.ROW_V1));
}
} else {
//Create empty hosts array for zero length files
splits.add(makeSplit(segmentId, path, 0, length, new String[0],
- FileFormat.rowformat));
+ FileFormat.ROW_V1));
}
}
} finally {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ee71610e/hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormatTest.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormatTest.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormatTest.java
index 9970c50..4f81518 100644
--- a/hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormatTest.java
+++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormatTest.java
@@ -73,7 +73,7 @@ public class CarbonStreamInputFormatTest extends TestCase {
List<CarbonInputSplit> splitList = new ArrayList<>();
splitList.add(carbonInputSplit);
return new CarbonMultiBlockSplit(identifier, splitList, new String[] { "localhost" },
- FileFormat.rowformat);
+ FileFormat.ROW_V1);
}
@Test public void testCreateRecordReader() {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ee71610e/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
index a2c9c6d..6c2490e 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
@@ -29,7 +29,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier}
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
@@ -90,6 +90,7 @@ object CarbonStore {
load.getSegmentStatus.getMessage,
startTime,
endTime,
+ load.getFileFormat.toString,
mergedTo)
}.toSeq
} else {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ee71610e/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index 2a7ca47..aaeedb4 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -306,10 +306,10 @@ class CarbonMergerRDD[K, V](
val splits = format.getSplits(job)
// keep on assigning till last one is reached.
- if (null != splits && splits.size > 0) {
- splitsOfLastSegment = splits.asScala.map(_.asInstanceOf[CarbonInputSplit])
- .filter { split => FileFormat.carbondata.equals(split.getFileFormat) }.toList.asJava
- }
+ if (null != splits && splits.size > 0) splitsOfLastSegment =
+ splits.asScala
+ .map(_.asInstanceOf[CarbonInputSplit])
+ .filter { split => FileFormat.COLUMNAR_V3.equals(split.getFileFormat) }.toList.asJava
carbonInputSplits ++:= splits.asScala.map(_.asInstanceOf[CarbonInputSplit]).filter(entry => {
val blockInfo = new TableBlockInfo(entry.getPath.toString,
@@ -317,10 +317,10 @@ class CarbonMergerRDD[K, V](
entry.getLocations, entry.getLength, entry.getVersion,
updateStatusManager.getDeleteDeltaFilePath(entry.getPath.toString)
)
- (((!updated) || ((updated) && (!CarbonUtil
+ ((!updated) || (updated && (!CarbonUtil
.isInvalidTableBlock(blockInfo.getSegmentId, blockInfo.getFilePath,
updateDetails, updateStatusManager)))) &&
- FileFormat.carbondata.equals(entry.getFileFormat))
+ FileFormat.COLUMNAR_V3.equals(entry.getFileFormat)
})
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ee71610e/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index a84b040..b24562c 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -94,7 +94,7 @@ class CarbonScanRDD(
val streamSplits = new ArrayBuffer[InputSplit]()
splits.asScala.foreach { split =>
val carbonInputSplit = split.asInstanceOf[CarbonInputSplit]
- if (FileFormat.rowformat == carbonInputSplit.getFileFormat) {
+ if (FileFormat.ROW_V1 == carbonInputSplit.getFileFormat) {
streamSplits += split
} else {
columnarSplits.add(split)
@@ -111,7 +111,7 @@ class CarbonScanRDD(
new CarbonMultiBlockSplit(identifier,
Seq(splitWithIndex._1.asInstanceOf[CarbonInputSplit]).asJava,
splitWithIndex._1.getLocations,
- FileFormat.rowformat)
+ FileFormat.ROW_V1)
new CarbonSparkPartition(id, splitWithIndex._2 + index, multiBlockSplit)
}
if (batchPartitions.isEmpty) {
@@ -250,7 +250,7 @@ class CarbonScanRDD(
val model = format.getQueryModel(inputSplit, attemptContext)
// get RecordReader by FileFormat
val reader: RecordReader[Void, Object] = inputSplit.getFileFormat match {
- case FileFormat.rowformat =>
+ case FileFormat.ROW_V1 =>
// create record reader for row format
DataTypeUtil.setDataTypeConverter(new SparkDataTypeConverterImpl)
val inputFormat = new CarbonStreamInputFormat
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ee71610e/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index f0b33f4..a922a07 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -859,14 +859,14 @@ object CommonUtil {
CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT,
CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT).toBoolean) {
new CarbonMergeFilesRDD(sparkContext, AbsoluteTableIdentifier.from(tablePath,
- carbonTable.getDatabaseName, carbonTable.getFactTableName).getTablePath,
+ carbonTable.getDatabaseName, carbonTable.getTableName).getTablePath,
segmentIds).collect()
}
} catch {
case _: Exception =>
if (CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT.toBoolean) {
new CarbonMergeFilesRDD(sparkContext, AbsoluteTableIdentifier.from(tablePath,
- carbonTable.getDatabaseName, carbonTable.getFactTableName).getTablePath,
+ carbonTable.getDatabaseName, carbonTable.getTableName).getTablePath,
segmentIds).collect()
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ee71610e/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
index 62632df..f8a5404 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
@@ -123,6 +123,7 @@ case class ShowLoadsCommand(
AttributeReference("Status", StringType, nullable = false)(),
AttributeReference("Load Start Time", TimestampType, nullable = false)(),
AttributeReference("Load End Time", TimestampType, nullable = true)(),
+ AttributeReference("File Format", StringType, nullable = false)(),
AttributeReference("Merged To", StringType, nullable = false)())
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ee71610e/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala
index ac3fa5c..b23ba2c 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala
@@ -232,7 +232,7 @@ class TestSegmentReading extends QueryTest with BeforeAndAfterAll {
|('DELIMITER'= ',', 'QUOTECHAR'= '\"')""".stripMargin)
val df = sql("SHOW SEGMENTS for table carbon_table_show_seg")
val col = df.collect().map{
- row => Row(row.getString(0),row.getString(1),row.getString(4))
+ row => Row(row.getString(0),row.getString(1),row.getString(5))
}.toSeq
assert(col.equals(Seq(Row("2","Success","NA"),
Row("1","Compacted","0.1"),
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ee71610e/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
index b29cca4..33aa2c9 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
@@ -32,8 +32,7 @@ import org.apache.spark.sql.test.util.QueryTest
import org.apache.spark.sql.types.StructType
import org.scalatest.BeforeAndAfterAll
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.statusmanager.SegmentStatus
+import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentStatus}
import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
import org.apache.carbondata.hadoop.streaming.CarbonStreamOutputFormat
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
@@ -561,11 +560,13 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
}
sql("alter table streaming.stream_table_compact compact 'minor'")
+ sql("show segments for table streaming.stream_table_compact").show
val result = sql("show segments for table streaming.stream_table_compact").collect()
result.foreach { row =>
if (row.getString(0).equals("1")) {
assertResult(SegmentStatus.STREAMING.getMessage)(row.getString(1))
+ assertResult(FileFormat.ROW_V1.toString)(row.getString(4))
}
}
}
@@ -583,7 +584,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
badRecordAction = "force",
handoffSize = 1024L * 200
)
- assert(sql("show segments for table streaming.stream_table_new").count() == 4)
+ assert(sql("show segments for table streaming.stream_table_new").count() > 1)
checkAnswer(
sql("select count(*) from streaming.stream_table_new"),
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ee71610e/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
index 7682437..0187597 100644
--- a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
@@ -73,7 +73,7 @@ public class StreamSegment {
SegmentStatusManager.readLoadMetadata(tablePath.getMetadataDirectoryPath());
LoadMetadataDetails streamSegment = null;
for (LoadMetadataDetails detail : details) {
- if (FileFormat.rowformat == detail.getFileFormat()) {
+ if (FileFormat.ROW_V1 == detail.getFileFormat()) {
if (SegmentStatus.STREAMING == detail.getSegmentStatus()) {
streamSegment = detail;
break;
@@ -85,7 +85,7 @@ public class StreamSegment {
LoadMetadataDetails newDetail = new LoadMetadataDetails();
newDetail.setPartitionCount("0");
newDetail.setLoadName("" + segmentId);
- newDetail.setFileFormat(FileFormat.rowformat);
+ newDetail.setFileFormat(FileFormat.ROW_V1);
newDetail.setLoadStartTime(System.currentTimeMillis());
newDetail.setSegmentStatus(SegmentStatus.STREAMING);
@@ -149,7 +149,7 @@ public class StreamSegment {
LoadMetadataDetails newDetail = new LoadMetadataDetails();
newDetail.setPartitionCount("0");
newDetail.setLoadName("" + newSegmentId);
- newDetail.setFileFormat(FileFormat.rowformat);
+ newDetail.setFileFormat(FileFormat.ROW_V1);
newDetail.setLoadStartTime(System.currentTimeMillis());
newDetail.setSegmentStatus(SegmentStatus.STREAMING);