You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/11/14 16:00:50 UTC

carbondata git commit: [CARBONDATA-1707][Streaming] Log the taken time of each stream batch and fix StreamExample issue

Repository: carbondata
Updated Branches:
  refs/heads/master d062ab41d -> 667ee81f1


[CARBONDATA-1707][Streaming] Log the taken time of each stream batch and fix StreamExample issue

Log the taken time of each stream batch and fix StreamExample issue

This closes #1495


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/667ee81f
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/667ee81f
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/667ee81f

Branch: refs/heads/master
Commit: 667ee81f15555a51d0b2400e557dcfbf8e04fbe4
Parents: d062ab4
Author: QiangCai <qi...@qq.com>
Authored: Tue Nov 14 15:42:20 2017 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Wed Nov 15 00:00:39 2017 +0800

----------------------------------------------------------------------
 .../org/apache/carbondata/examples/StreamExample.scala | 12 ++++++++----
 .../streaming/CarbonAppendableStreamSink.scala         | 13 ++++++++++++-
 2 files changed, 20 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/667ee81f/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamExample.scala
index c31a0aa..4b59aad 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamExample.scala
@@ -71,7 +71,8 @@ object StreamExample {
              | file struct<school:array<string>, age:int>
              | )
              | STORED BY 'carbondata'
-             | TBLPROPERTIES('sort_columns'='name', 'dictionary_include'='city')
+             | TBLPROPERTIES(
+             | 'streaming'='true', 'sort_columns'='name', 'dictionary_include'='city')
              | """.stripMargin)
       } else {
         spark.sql(
@@ -83,6 +84,8 @@ object StreamExample {
              | salary FLOAT
              | )
              | STORED BY 'carbondata'
+             | TBLPROPERTIES(
+             | 'streaming'='true', 'sort_columns'='name')
              | """.stripMargin)
       }
 
@@ -173,7 +176,8 @@ object StreamExample {
 
           qry.awaitTermination()
         } catch {
-          case _: InterruptedException =>
+          case ex =>
+            ex.printStackTrace()
             println("Done reading and writing streaming data")
         } finally {
           qry.stop()
@@ -193,14 +197,14 @@ object StreamExample {
         var index = 0
         for (_ <- 1 to 1000) {
           // write 5 records per iteration
-          for (_ <- 0 to 100) {
+          for (_ <- 0 to 1000) {
             index = index + 1
             socketWriter.println(index.toString + ",name_" + index
                                  + ",city_" + index + "," + (index * 10000.00).toString +
                                  ",school_" + index + ":school_" + index + index + "$" + index)
           }
           socketWriter.flush()
-          Thread.sleep(2000)
+          Thread.sleep(1000)
         }
         socketWriter.close()
         System.out.println("Socket closed")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/667ee81f/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
index b3f0964..1281129 100644
--- a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
+++ b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
@@ -20,8 +20,9 @@ package org.apache.spark.sql.execution.streaming
 import java.util.Date
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext, TaskAttemptID, TaskID, TaskType}
+import org.apache.hadoop.mapreduce._
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.hadoop.mapreduce.v2.api.records.JobId
 import org.apache.spark.{SparkHadoopWriter, TaskContext}
 import org.apache.spark.internal.io.FileCommitProtocol
 import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
@@ -35,6 +36,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.dictionary.server.DictionaryServer
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.stats.QueryStatistic
 import org.apache.carbondata.core.util.path.CarbonStorePath
 import org.apache.carbondata.hadoop.streaming.CarbonStreamOutputFormat
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
@@ -77,6 +79,9 @@ class CarbonAppendableStreamSink(
     if (batchId <= fileLog.getLatest().map(_._1).getOrElse(-1L)) {
       CarbonAppendableStreamSink.LOGGER.info(s"Skipping already committed batch $batchId")
     } else {
+
+      val statistic = new QueryStatistic()
+
       checkOrHandOffSegment()
 
       // committer will record how this spark job commit its output
@@ -102,6 +107,10 @@ class CarbonAppendableStreamSink(
         committer,
         hadoopConf,
         server)
+
+      statistic.addStatistics(s"add batch: $batchId", System.currentTimeMillis())
+      CarbonAppendableStreamSink.LOGGER.info(
+        s"${statistic.getMessage}, taken time(ms): ${statistic.getTimeTaken}")
     }
   }
 
@@ -158,6 +167,8 @@ object CarbonAppendableStreamSink {
     val job = Job.getInstance(hadoopConf)
     job.setOutputKeyClass(classOf[Void])
     job.setOutputValueClass(classOf[InternalRow])
+    val jobId = SparkHadoopWriter.createJobID(new Date, batchId.toInt)
+    job.setJobID(jobId)
 
     val description = WriteDataFileJobDescription(
       serializableHadoopConf = new SerializableConfiguration(job.getConfiguration),