You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2018/04/17 06:40:36 UTC
carbondata git commit: [CARBONDATA-2335] Fix auto handoff for
preaggregate table with streaming
Repository: carbondata
Updated Branches:
refs/heads/master b0e6fbea2 -> 78e4d0da3
[CARBONDATA-2335] Fix auto handoff for preaggregate table with streaming
Solution: MetaListener was not called for auto handoff case therefore
child load commands were not being created thus throwing NullPointerException.
This closes #2160
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/78e4d0da
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/78e4d0da
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/78e4d0da
Branch: refs/heads/master
Commit: 78e4d0da33e51f124476ae100cc6a0109960c107
Parents: b0e6fbe
Author: kunal642 <ku...@gmail.com>
Authored: Wed Apr 11 20:26:01 2018 +0530
Committer: kumarvishal <ku...@gmail.com>
Committed: Tue Apr 17 14:39:35 2018 +0800
----------------------------------------------------------------------
.../streaming/StreamSinkFactory.scala | 9 ++++--
.../streaming/CarbonAppendableStreamSink.scala | 9 +++---
.../TestStreamingTableOperation.scala | 31 +++++++++++++++++++-
3 files changed, 41 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/78e4d0da/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
index 1d4f7fc..c162ea7 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
@@ -38,7 +38,7 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
-import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostExecutionEvent, LoadTablePreExecutionEvent}
+import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent}
import org.apache.carbondata.processing.loading.model.{CarbonLoadModel, CarbonLoadModelBuilder, LoadOption}
import org.apache.carbondata.spark.dictionary.provider.SecureDictionaryServiceProvider
import org.apache.carbondata.spark.dictionary.server.SecureDictionaryServer
@@ -110,6 +110,9 @@ object StreamSinkFactory {
val segmentId = getStreamSegmentId(carbonTable)
carbonLoadModel.setSegmentId(segmentId)
+ // Used to generate load commands for child tables in case auto-handoff is fired.
+ val loadMetaEvent = new LoadMetadataEvent(carbonTable, false)
+ OperationListenerBus.getInstance().fireEvent(loadMetaEvent, operationContext)
// start server if necessary
val server = startDictionaryServer(
sparkSession,
@@ -120,6 +123,7 @@ object StreamSinkFactory {
} else {
carbonLoadModel.setUseOnePass(false)
}
+
// default is carbon appended stream sink
val carbonAppendableStreamSink = new CarbonAppendableStreamSink(
sparkSession,
@@ -127,7 +131,8 @@ object StreamSinkFactory {
segmentId,
parameters,
carbonLoadModel,
- server)
+ server,
+ operationContext)
// fire post event before streamin is started
val loadTablePostExecutionEvent = new LoadTablePostExecutionEvent(
http://git-wip-us.apache.org/repos/asf/carbondata/blob/78e4d0da/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
index 97a1a16..44f96bd 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
@@ -62,7 +62,8 @@ class CarbonAppendableStreamSink(
var currentSegmentId: String,
parameters: Map[String, String],
carbonLoadModel: CarbonLoadModel,
- server: Option[DictionaryServer]) extends Sink {
+ server: Option[DictionaryServer],
+ operationContext: OperationContext) extends Sink {
private val fileLogPath = CarbonTablePath.getStreamingLogDir(carbonTable.getTablePath)
private val fileLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, fileLogPath)
@@ -110,7 +111,6 @@ class CarbonAppendableStreamSink(
// fire pre event on every batch add
// in case of streaming options and optionsFinal can be same
- val operationContext = new OperationContext
val loadTablePreExecutionEvent = new LoadTablePreExecutionEvent(
carbonTable.getCarbonTableIdentifier,
carbonLoadModel,
@@ -118,8 +118,7 @@ class CarbonAppendableStreamSink(
false,
parameters.asJava,
parameters.asJava,
- false
- )
+ false)
OperationListenerBus.getInstance().fireEvent(loadTablePreExecutionEvent, operationContext)
checkOrHandOffSegment()
@@ -180,7 +179,7 @@ class CarbonAppendableStreamSink(
if (enableAutoHandoff) {
StreamHandoffRDD.startStreamingHandoffThread(
carbonLoadModel,
- new OperationContext,
+ operationContext,
sparkSession,
false)
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/78e4d0da/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
index 71ce2b2..aa068fc 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
@@ -179,7 +179,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
assertResult("Alter table change datatype is not allowed for streaming table")(changeDataTypeException.getMessage)
}
- override def afterAll {
+ override def afterAll {
dropTable()
sql("USE default")
sql("DROP DATABASE IF EXISTS streaming CASCADE")
@@ -1454,6 +1454,35 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
)
}
+ test("test autohandoff with preaggregate tables") {
+ sql("drop table if exists maintable")
+ createTable(tableName = "maintable", streaming = true, withBatchLoad = false)
+ sql("create datamap p1 on table maintable using 'preaggregate' as select name, sum(id) from maintable group by name")
+ executeStreamingIngest(
+ tableName = "maintable",
+ batchNums = 2,
+ rowNumsEachBatch = 100,
+ intervalOfSource = 5,
+ intervalOfIngest = 5,
+ continueSeconds = 20,
+ generateBadRecords = false,
+ badRecordAction = "force",
+ handoffSize = 1L,
+ autoHandoff = false)
+ executeStreamingIngest(
+ tableName = "maintable",
+ batchNums = 2,
+ rowNumsEachBatch = 100,
+ intervalOfSource = 5,
+ intervalOfIngest = 5,
+ continueSeconds = 20,
+ generateBadRecords = false,
+ badRecordAction = "force",
+ handoffSize = 1L,
+ autoHandoff = true)
+ checkAnswer(sql("select count(*) from maintable_p1"), Seq(Row(200)))
+ }
+
test("block drop streaming table while streaming is in progress") {
val identifier = new TableIdentifier("stream_table_drop", Option("streaming"))
val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark)