You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by qi...@apache.org on 2017/11/15 01:36:34 UTC

carbondata git commit: [CARBONDATA-1612][CARBONDATA-1615][Streaming] Support delete segment for streaming table

Repository: carbondata
Updated Branches:
  refs/heads/master 1155d4d8f -> 09d020561


[CARBONDATA-1612][CARBONDATA-1615][Streaming] Support delete segment for streaming table

This closes #1497


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/09d02056
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/09d02056
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/09d02056

Branch: refs/heads/master
Commit: 09d020561a6a5b7bd90b769e608b4130baa43667
Parents: 1155d4d
Author: Jacky Li <ja...@qq.com>
Authored: Tue Nov 14 20:41:24 2017 +0800
Committer: QiangCai <qi...@qq.com>
Committed: Wed Nov 15 09:35:53 2017 +0800

----------------------------------------------------------------------
 .../spark/sql/CarbonCatalystOperators.scala     |  4 +-
 .../TestStreamingTableOperation.scala           | 55 +++++++++++++++++++-
 2 files changed, 55 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/09d02056/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
index 48f1a09..62632df 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
@@ -119,10 +119,10 @@ case class ShowLoadsCommand(
   extends Command {
 
   override def output: Seq[Attribute] = {
-    Seq(AttributeReference("SegmentSequenceId", StringType, nullable = false)(),
+    Seq(AttributeReference("Segment Id", StringType, nullable = false)(),
       AttributeReference("Status", StringType, nullable = false)(),
       AttributeReference("Load Start Time", TimestampType, nullable = false)(),
-      AttributeReference("Load End Time", TimestampType, nullable = false)(),
+      AttributeReference("Load End Time", TimestampType, nullable = true)(),
       AttributeReference("Merged To", StringType, nullable = false)())
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/09d02056/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
index 3fb1424..b29cca4 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
@@ -19,6 +19,7 @@ package org.apache.spark.carbondata
 
 import java.io.{File, PrintWriter}
 import java.net.ServerSocket
+import java.util.{Calendar, Date}
 import java.util.concurrent.Executors
 
 import scala.collection.mutable
@@ -103,6 +104,9 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
 
     // 10. fault tolerant
     createTable(tableName = "stream_table_tolerant", streaming = true, withBatchLoad = true)
+
+    // 11. table for delete segment test
+    createTable(tableName = "stream_table_delete", streaming = true, withBatchLoad = false)
   }
 
   test("validate streaming property") {
@@ -181,6 +185,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
     sql("drop table if exists streaming.stream_table_compact")
     sql("drop table if exists streaming.stream_table_new")
     sql("drop table if exists streaming.stream_table_tolerant")
+    sql("drop table if exists streaming.stream_table_delete")
   }
 
   // normal table not support streaming ingest
@@ -578,8 +583,6 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
       badRecordAction = "force",
       handoffSize = 1024L * 200
     )
-    sql("show segments for table streaming.stream_table_new").show(100, false)
-
     assert(sql("show segments for table streaming.stream_table_new").count() == 4)
 
     checkAnswer(
@@ -588,6 +591,51 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
     )
   }
 
+  test("test deleting streaming segment by ID while ingesting") {
+    executeStreamingIngest(
+      tableName = "stream_table_delete",
+      batchNums = 6,
+      rowNumsEachBatch = 10000,
+      intervalOfSource = 3,
+      intervalOfIngest = 5,
+      continueSeconds = 15,
+      generateBadRecords = false,
+      badRecordAction = "force",
+      handoffSize = 1024L * 200
+    )
+    val beforeDelete = sql("show segments for table streaming.stream_table_delete").collect()
+    val segmentId = beforeDelete.map(_.getString(0)).mkString(",")
+    sql(s"delete from table streaming.stream_table_delete where segment.id in ($segmentId) ")
+
+    val rows = sql("show segments for table streaming.stream_table_delete").collect()
+    rows.foreach { row =>
+      assertResult(SegmentStatus.MARKED_FOR_DELETE.getMessage)(row.getString(1))
+    }
+  }
+
+  test("test deleting streaming segment by date while ingesting") {
+    executeStreamingIngest(
+      tableName = "stream_table_delete",
+      batchNums = 6,
+      rowNumsEachBatch = 10000,
+      intervalOfSource = 3,
+      intervalOfIngest = 5,
+      continueSeconds = 15,
+      generateBadRecords = false,
+      badRecordAction = "force",
+      handoffSize = 1024L * 200
+    )
+    val beforeDelete = sql("show segments for table streaming.stream_table_delete").collect()
+
+    sql(s"delete from table streaming.stream_table_delete where segment.starttime before '2999-10-01 01:00:00'")
+
+    val rows = sql("show segments for table streaming.stream_table_delete").collect()
+    assertResult(beforeDelete.length)(rows.length)
+    rows.foreach { row =>
+      assertResult(SegmentStatus.MARKED_FOR_DELETE.getMessage)(row.getString(1))
+    }
+  }
+
   def createWriteSocketThread(
       serverSocket: ServerSocket,
       writeNums: Int,
@@ -675,6 +723,9 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
     }
   }
 
+  /**
+   * start ingestion thread: write `rowNumsEachBatch` rows repeatly for `batchNums` times.
+   */
   def executeStreamingIngest(
       tableName: String,
       batchNums: Int,