You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2018/04/17 06:40:36 UTC

carbondata git commit: [CARBONDATA-2335] Fix auto handoff for preaggregate table with streaming

Repository: carbondata
Updated Branches:
  refs/heads/master b0e6fbea2 -> 78e4d0da3


[CARBONDATA-2335] Fix auto handoff for preaggregate table with streaming

Solution: MetaListener was not called for auto handoff case therefore
child load commands were not being created thus throwing NullPointerException.

This closes #2160


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/78e4d0da
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/78e4d0da
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/78e4d0da

Branch: refs/heads/master
Commit: 78e4d0da33e51f124476ae100cc6a0109960c107
Parents: b0e6fbe
Author: kunal642 <ku...@gmail.com>
Authored: Wed Apr 11 20:26:01 2018 +0530
Committer: kumarvishal <ku...@gmail.com>
Committed: Tue Apr 17 14:39:35 2018 +0800

----------------------------------------------------------------------
 .../streaming/StreamSinkFactory.scala           |  9 ++++--
 .../streaming/CarbonAppendableStreamSink.scala  |  9 +++---
 .../TestStreamingTableOperation.scala           | 31 +++++++++++++++++++-
 3 files changed, 41 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/78e4d0da/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
index 1d4f7fc..c162ea7 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
@@ -38,7 +38,7 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
-import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostExecutionEvent, LoadTablePreExecutionEvent}
+import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent}
 import org.apache.carbondata.processing.loading.model.{CarbonLoadModel, CarbonLoadModelBuilder, LoadOption}
 import org.apache.carbondata.spark.dictionary.provider.SecureDictionaryServiceProvider
 import org.apache.carbondata.spark.dictionary.server.SecureDictionaryServer
@@ -110,6 +110,9 @@ object StreamSinkFactory {
     val segmentId = getStreamSegmentId(carbonTable)
     carbonLoadModel.setSegmentId(segmentId)
 
+    // Used to generate load commands for child tables in case auto-handoff is fired.
+    val loadMetaEvent = new LoadMetadataEvent(carbonTable, false)
+    OperationListenerBus.getInstance().fireEvent(loadMetaEvent, operationContext)
     // start server if necessary
     val server = startDictionaryServer(
       sparkSession,
@@ -120,6 +123,7 @@ object StreamSinkFactory {
     } else {
       carbonLoadModel.setUseOnePass(false)
     }
+
     // default is carbon appended stream sink
     val carbonAppendableStreamSink = new CarbonAppendableStreamSink(
       sparkSession,
@@ -127,7 +131,8 @@ object StreamSinkFactory {
       segmentId,
       parameters,
       carbonLoadModel,
-      server)
+      server,
+      operationContext)
 
     // fire post event before streamin is started
     val loadTablePostExecutionEvent = new LoadTablePostExecutionEvent(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/78e4d0da/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
index 97a1a16..44f96bd 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
@@ -62,7 +62,8 @@ class CarbonAppendableStreamSink(
     var currentSegmentId: String,
     parameters: Map[String, String],
     carbonLoadModel: CarbonLoadModel,
-    server: Option[DictionaryServer]) extends Sink {
+    server: Option[DictionaryServer],
+    operationContext: OperationContext) extends Sink {
 
   private val fileLogPath = CarbonTablePath.getStreamingLogDir(carbonTable.getTablePath)
   private val fileLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, fileLogPath)
@@ -110,7 +111,6 @@ class CarbonAppendableStreamSink(
 
       // fire pre event on every batch add
       // in case of streaming options and optionsFinal can be same
-      val operationContext = new OperationContext
       val loadTablePreExecutionEvent = new LoadTablePreExecutionEvent(
         carbonTable.getCarbonTableIdentifier,
         carbonLoadModel,
@@ -118,8 +118,7 @@ class CarbonAppendableStreamSink(
         false,
         parameters.asJava,
         parameters.asJava,
-        false
-      )
+        false)
       OperationListenerBus.getInstance().fireEvent(loadTablePreExecutionEvent, operationContext)
       checkOrHandOffSegment()
 
@@ -180,7 +179,7 @@ class CarbonAppendableStreamSink(
         if (enableAutoHandoff) {
           StreamHandoffRDD.startStreamingHandoffThread(
             carbonLoadModel,
-            new OperationContext,
+            operationContext,
             sparkSession,
             false)
         }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/78e4d0da/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
index 71ce2b2..aa068fc 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
@@ -179,7 +179,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
     assertResult("Alter table change datatype is not allowed for streaming table")(changeDataTypeException.getMessage)
   }
 
-  override def afterAll {
+  override def  afterAll {
     dropTable()
     sql("USE default")
     sql("DROP DATABASE IF EXISTS streaming CASCADE")
@@ -1454,6 +1454,35 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
     )
   }
 
+  test("test autohandoff with preaggregate tables") {
+    sql("drop table if exists maintable")
+    createTable(tableName = "maintable", streaming = true, withBatchLoad = false)
+    sql("create datamap p1 on table maintable using 'preaggregate' as select name, sum(id) from maintable group by name")
+    executeStreamingIngest(
+      tableName = "maintable",
+      batchNums = 2,
+      rowNumsEachBatch = 100,
+      intervalOfSource = 5,
+      intervalOfIngest = 5,
+      continueSeconds = 20,
+      generateBadRecords = false,
+      badRecordAction = "force",
+      handoffSize = 1L,
+      autoHandoff = false)
+    executeStreamingIngest(
+      tableName = "maintable",
+      batchNums = 2,
+      rowNumsEachBatch = 100,
+      intervalOfSource = 5,
+      intervalOfIngest = 5,
+      continueSeconds = 20,
+      generateBadRecords = false,
+      badRecordAction = "force",
+      handoffSize = 1L,
+      autoHandoff = true)
+    checkAnswer(sql("select count(*) from maintable_p1"), Seq(Row(200)))
+  }
+
   test("block drop streaming table while streaming is in progress") {
     val identifier = new TableIdentifier("stream_table_drop", Option("streaming"))
     val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark)