You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/04/08 06:44:51 UTC

carbondata git commit: [CARBONDATA-2299]Support showing all segment information(include visible and invisible segments)

Repository: carbondata
Updated Branches:
  refs/heads/master f6990d622 -> 57c54fb7f


[CARBONDATA-2299]Support showing all segment information(include visible and invisible segments)

Use command 'SHOW HISTORY SEGMENTS' to show all segment information(include visible and invisible segments)

assert the segment name, Status, Visibility in the collect result

add doc for command 'SHOW HISTORY SEGMENTS'

This closes #2125


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/57c54fb7
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/57c54fb7
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/57c54fb7

Branch: refs/heads/master
Commit: 57c54fb7f3a4f9621f312ceadca1105df42a0384
Parents: f6990d6
Author: Zhang Zhichao <44...@qq.com>
Authored: Sun Apr 1 10:05:05 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Sun Apr 8 14:44:12 2018 +0800

----------------------------------------------------------------------
 docs/data-management-on-carbondata.md           |  7 +++-
 .../org/apache/carbondata/api/CarbonStore.scala | 43 ++++++++++++++------
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala |  1 +
 .../management/CarbonShowLoadsCommand.scala     | 28 +++++++++----
 .../sql/parser/CarbonSpark2SqlParser.scala      |  7 ++--
 .../apache/spark/util/CarbonCommandSuite.scala  | 33 +++++++++++++++
 6 files changed, 95 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/57c54fb7/docs/data-management-on-carbondata.md
----------------------------------------------------------------------
diff --git a/docs/data-management-on-carbondata.md b/docs/data-management-on-carbondata.md
index a992063..2eb91bb 100644
--- a/docs/data-management-on-carbondata.md
+++ b/docs/data-management-on-carbondata.md
@@ -939,13 +939,18 @@ This tutorial is going to introduce all commands and data operations on CarbonDa
   This command is used to list the segments of CarbonData table.
 
   ```
-  SHOW SEGMENTS FOR TABLE [db_name.]table_name LIMIT number_of_segments
+  SHOW [HISTORY] SEGMENTS FOR TABLE [db_name.]table_name LIMIT number_of_segments
   ```
   
   Example:
+  Show visible segments
   ```
   SHOW SEGMENTS FOR TABLE CarbonDatabase.CarbonTable LIMIT 4
   ```
+  Show all segments, include invisible segments
+  ```
+  SHOW HISTORY SEGMENTS FOR TABLE CarbonDatabase.CarbonTable LIMIT 4
+  ```
 
 ### DELETE SEGMENT BY ID
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/57c54fb7/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
index bfb1616..1f1fc7f 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
@@ -42,15 +42,24 @@ object CarbonStore {
 
   def showSegments(
       limit: Option[String],
-      tableFolderPath: String): Seq[Row] = {
-    val loadMetadataDetailsArray = SegmentStatusManager.readLoadMetadata(tableFolderPath)
+      tableFolderPath: String,
+      showHistory: Boolean): Seq[Row] = {
+    val loadMetadataDetailsArray = if (showHistory) {
+      SegmentStatusManager.readLoadMetadata(tableFolderPath) ++
+      SegmentStatusManager.readLoadHistoryMetadata(tableFolderPath)
+    } else {
+      SegmentStatusManager.readLoadMetadata(tableFolderPath)
+    }
+
     if (loadMetadataDetailsArray.nonEmpty) {
       var loadMetadataDetailsSortedArray = loadMetadataDetailsArray.sortWith { (l1, l2) =>
         java.lang.Double.parseDouble(l1.getLoadName) > java.lang.Double.parseDouble(l2.getLoadName)
       }
-      if (limit.isDefined) {
+      if (!showHistory) {
         loadMetadataDetailsSortedArray = loadMetadataDetailsSortedArray
-          .filter(load => load.getVisibility.equalsIgnoreCase("true"))
+          .filter(_.getVisibility.equalsIgnoreCase("true"))
+      }
+      if (limit.isDefined) {
         val limitLoads = limit.get
         try {
           val lim = Integer.parseInt(limitLoads)
@@ -62,7 +71,6 @@ object CarbonStore {
       }
 
       loadMetadataDetailsSortedArray
-        .filter(_.getVisibility.equalsIgnoreCase("true"))
         .map { load =>
           val mergedTo =
             if (load.getMergedLoadName != null) {
@@ -85,13 +93,24 @@ object CarbonStore {
               new java.sql.Timestamp(load.getLoadEndTime)
             }
 
-          Row(
-            load.getLoadName,
-            load.getSegmentStatus.getMessage,
-            startTime,
-            endTime,
-            mergedTo,
-            load.getFileFormat.toString)
+          if (showHistory) {
+            Row(
+              load.getLoadName,
+              load.getSegmentStatus.getMessage,
+              startTime,
+              endTime,
+              mergedTo,
+              load.getFileFormat.toString,
+              load.getVisibility())
+          } else {
+            Row(
+              load.getLoadName,
+              load.getSegmentStatus.getMessage,
+              startTime,
+              endTime,
+              mergedTo,
+              load.getFileFormat.toString)
+          }
         }.toSeq
     } else {
       Seq.empty

http://git-wip-us.apache.org/repos/asf/carbondata/blob/57c54fb7/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index b7d4d16..fea3482 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -152,6 +152,7 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
   protected val EXISTS = carbonKeyWord("EXISTS")
   protected val DIMENSION = carbonKeyWord("DIMENSION")
   protected val STARTTIME = carbonKeyWord("STARTTIME")
+  protected val HISTORY = carbonKeyWord("HISTORY")
   protected val SEGMENTS = carbonKeyWord("SEGMENTS")
   protected val SEGMENT = carbonKeyWord("SEGMENT")
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/57c54fb7/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
index 1e65887..cd5d8f9 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
@@ -28,17 +28,28 @@ import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandExcepti
 case class CarbonShowLoadsCommand(
     databaseNameOp: Option[String],
     tableName: String,
-    limit: Option[String])
+    limit: Option[String],
+    showHistory: Boolean = false)
   extends DataCommand {
 
   // add new columns of show segments at last
   override def output: Seq[Attribute] = {
-    Seq(AttributeReference("SegmentSequenceId", StringType, nullable = false)(),
-      AttributeReference("Status", StringType, nullable = false)(),
-      AttributeReference("Load Start Time", TimestampType, nullable = false)(),
-      AttributeReference("Load End Time", TimestampType, nullable = true)(),
-      AttributeReference("Merged To", StringType, nullable = false)(),
-      AttributeReference("File Format", StringType, nullable = false)())
+    if (showHistory) {
+      Seq(AttributeReference("SegmentSequenceId", StringType, nullable = false)(),
+        AttributeReference("Status", StringType, nullable = false)(),
+        AttributeReference("Load Start Time", TimestampType, nullable = false)(),
+        AttributeReference("Load End Time", TimestampType, nullable = true)(),
+        AttributeReference("Merged To", StringType, nullable = false)(),
+        AttributeReference("File Format", StringType, nullable = false)(),
+        AttributeReference("Visibility", StringType, nullable = false)())
+    } else {
+      Seq(AttributeReference("SegmentSequenceId", StringType, nullable = false)(),
+        AttributeReference("Status", StringType, nullable = false)(),
+        AttributeReference("Load Start Time", TimestampType, nullable = false)(),
+        AttributeReference("Load End Time", TimestampType, nullable = true)(),
+        AttributeReference("Merged To", StringType, nullable = false)(),
+        AttributeReference("File Format", StringType, nullable = false)())
+    }
   }
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {
@@ -49,7 +60,8 @@ case class CarbonShowLoadsCommand(
     }
     CarbonStore.showSegments(
       limit,
-      carbonTable.getMetadataPath
+      carbonTable.getMetadataPath,
+      showHistory
     )
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/57c54fb7/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index 1b30011..dbe39fb 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -437,12 +437,13 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
     }
 
   protected lazy val showLoads: Parser[LogicalPlan] =
-    SHOW ~> SEGMENTS ~> FOR ~> TABLE ~> (ident <~ ".").? ~ ident ~
+    (SHOW ~> opt(HISTORY) <~ SEGMENTS <~ FOR <~ TABLE) ~ (ident <~ ".").? ~ ident ~
     (LIMIT ~> numericLit).? <~
     opt(";") ^^ {
-      case databaseName ~ tableName ~ limit =>
+      case showHistory ~ databaseName ~ tableName ~ limit =>
         CarbonShowLoadsCommand(
-          convertDbNameToLowerCase(databaseName), tableName.toLowerCase(), limit)
+          convertDbNameToLowerCase(databaseName), tableName.toLowerCase(), limit,
+          showHistory.isDefined)
     }
 
   protected lazy val alterTableModifyDataType: Parser[LogicalPlan] =

http://git-wip-us.apache.org/repos/asf/carbondata/blob/57c54fb7/integration/spark2/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala b/integration/spark2/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala
index 230f4d8..e601043 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala
@@ -193,4 +193,37 @@ class CarbonCommandSuite extends Spark2QueryTest with BeforeAndAfterAll {
     dropTable(tableName)
   }
 
+  test("show history segments") {
+    val tableName = "test_tablestatus_history"
+    sql(s"drop table if exists ${tableName}")
+    sql(s"create table ${tableName} (name String, age int) stored by 'carbondata' "
+      + "TBLPROPERTIES('AUTO_LOAD_MERGE'='true','COMPACTION_LEVEL_THRESHOLD'='2,2')")
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", tableName)
+    sql(s"insert into ${tableName} select 'abc1',1")
+    sql(s"insert into ${tableName} select 'abc2',2")
+    sql(s"insert into ${tableName} select 'abc3',3")
+    sql(s"insert into ${tableName} select 'abc4',4")
+    sql(s"insert into ${tableName} select 'abc5',5")
+    sql(s"insert into ${tableName} select 'abc6',6")
+    assert(sql(s"show segments for table ${tableName}").collect().length == 10)
+    assert(sql(s"show history segments for table ${tableName}").collect().length == 10)
+    sql(s"clean files for table ${tableName}")
+    assert(sql(s"show segments for table ${tableName}").collect().length == 2)
+    val segmentsHisotryList = sql(s"show history segments for table ${tableName}").collect()
+    assert(segmentsHisotryList.length == 10)
+    assert(segmentsHisotryList(0).getString(0).equalsIgnoreCase("5"))
+    assert(segmentsHisotryList(0).getString(6).equalsIgnoreCase("false"))
+    assert(segmentsHisotryList(0).getString(1).equalsIgnoreCase("Compacted"))
+    assert(segmentsHisotryList(1).getString(0).equalsIgnoreCase("4.1"))
+    assert(segmentsHisotryList(1).getString(6).equalsIgnoreCase("true"))
+    assert(segmentsHisotryList(1).getString(1).equalsIgnoreCase("Success"))
+    assert(segmentsHisotryList(3).getString(0).equalsIgnoreCase("3"))
+    assert(segmentsHisotryList(3).getString(6).equalsIgnoreCase("false"))
+    assert(segmentsHisotryList(3).getString(1).equalsIgnoreCase("Compacted"))
+    assert(segmentsHisotryList(7).getString(0).equalsIgnoreCase("0.2"))
+    assert(segmentsHisotryList(7).getString(6).equalsIgnoreCase("true"))
+    assert(segmentsHisotryList(7).getString(1).equalsIgnoreCase("Success"))
+    assert(sql(s"show history segments for table ${tableName} limit 3").collect().length == 3)
+    dropTable(tableName)
+  }
 }