You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by qi...@apache.org on 2017/11/15 01:36:34 UTC
carbondata git commit: [CARBONDATA-1612][CARBONDATA-1615][Streaming]
Support delete segment for streaming table
Repository: carbondata
Updated Branches:
refs/heads/master 1155d4d8f -> 09d020561
[CARBONDATA-1612][CARBONDATA-1615][Streaming] Support delete segment for streaming table
This closes #1497
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/09d02056
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/09d02056
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/09d02056
Branch: refs/heads/master
Commit: 09d020561a6a5b7bd90b769e608b4130baa43667
Parents: 1155d4d
Author: Jacky Li <ja...@qq.com>
Authored: Tue Nov 14 20:41:24 2017 +0800
Committer: QiangCai <qi...@qq.com>
Committed: Wed Nov 15 09:35:53 2017 +0800
----------------------------------------------------------------------
.../spark/sql/CarbonCatalystOperators.scala | 4 +-
.../TestStreamingTableOperation.scala | 55 +++++++++++++++++++-
2 files changed, 55 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/09d02056/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
index 48f1a09..62632df 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
@@ -119,10 +119,10 @@ case class ShowLoadsCommand(
extends Command {
override def output: Seq[Attribute] = {
- Seq(AttributeReference("SegmentSequenceId", StringType, nullable = false)(),
+ Seq(AttributeReference("Segment Id", StringType, nullable = false)(),
AttributeReference("Status", StringType, nullable = false)(),
AttributeReference("Load Start Time", TimestampType, nullable = false)(),
- AttributeReference("Load End Time", TimestampType, nullable = false)(),
+ AttributeReference("Load End Time", TimestampType, nullable = true)(),
AttributeReference("Merged To", StringType, nullable = false)())
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/09d02056/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
index 3fb1424..b29cca4 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
@@ -19,6 +19,7 @@ package org.apache.spark.carbondata
import java.io.{File, PrintWriter}
import java.net.ServerSocket
+import java.util.{Calendar, Date}
import java.util.concurrent.Executors
import scala.collection.mutable
@@ -103,6 +104,9 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
// 10. fault tolerant
createTable(tableName = "stream_table_tolerant", streaming = true, withBatchLoad = true)
+
+ // 11. table for delete segment test
+ createTable(tableName = "stream_table_delete", streaming = true, withBatchLoad = false)
}
test("validate streaming property") {
@@ -181,6 +185,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
sql("drop table if exists streaming.stream_table_compact")
sql("drop table if exists streaming.stream_table_new")
sql("drop table if exists streaming.stream_table_tolerant")
+ sql("drop table if exists streaming.stream_table_delete")
}
// normal table not support streaming ingest
@@ -578,8 +583,6 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
badRecordAction = "force",
handoffSize = 1024L * 200
)
- sql("show segments for table streaming.stream_table_new").show(100, false)
-
assert(sql("show segments for table streaming.stream_table_new").count() == 4)
checkAnswer(
@@ -588,6 +591,51 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
)
}
+ test("test deleting streaming segment by ID while ingesting") {
+ executeStreamingIngest(
+ tableName = "stream_table_delete",
+ batchNums = 6,
+ rowNumsEachBatch = 10000,
+ intervalOfSource = 3,
+ intervalOfIngest = 5,
+ continueSeconds = 15,
+ generateBadRecords = false,
+ badRecordAction = "force",
+ handoffSize = 1024L * 200
+ )
+ val beforeDelete = sql("show segments for table streaming.stream_table_delete").collect()
+ val segmentId = beforeDelete.map(_.getString(0)).mkString(",")
+ sql(s"delete from table streaming.stream_table_delete where segment.id in ($segmentId) ")
+
+ val rows = sql("show segments for table streaming.stream_table_delete").collect()
+ rows.foreach { row =>
+ assertResult(SegmentStatus.MARKED_FOR_DELETE.getMessage)(row.getString(1))
+ }
+ }
+
+ test("test deleting streaming segment by date while ingesting") {
+ executeStreamingIngest(
+ tableName = "stream_table_delete",
+ batchNums = 6,
+ rowNumsEachBatch = 10000,
+ intervalOfSource = 3,
+ intervalOfIngest = 5,
+ continueSeconds = 15,
+ generateBadRecords = false,
+ badRecordAction = "force",
+ handoffSize = 1024L * 200
+ )
+ val beforeDelete = sql("show segments for table streaming.stream_table_delete").collect()
+
+ sql(s"delete from table streaming.stream_table_delete where segment.starttime before '2999-10-01 01:00:00'")
+
+ val rows = sql("show segments for table streaming.stream_table_delete").collect()
+ assertResult(beforeDelete.length)(rows.length)
+ rows.foreach { row =>
+ assertResult(SegmentStatus.MARKED_FOR_DELETE.getMessage)(row.getString(1))
+ }
+ }
+
def createWriteSocketThread(
serverSocket: ServerSocket,
writeNums: Int,
@@ -675,6 +723,9 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
}
}
+ /**
+ * start ingestion thread: write `rowNumsEachBatch` rows repeatly for `batchNums` times.
+ */
def executeStreamingIngest(
tableName: String,
batchNums: Int,