You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/11/14 16:00:50 UTC
carbondata git commit: [CARBONDATA-1707][Streaming] Log the taken
time of each stream batch and fix StreamExample issue
Repository: carbondata
Updated Branches:
refs/heads/master d062ab41d -> 667ee81f1
[CARBONDATA-1707][Streaming] Log the taken time of each stream batch and fix StreamExample issue
Log the taken time of each stream batch and fix StreamExample issue
This closes #1495
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/667ee81f
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/667ee81f
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/667ee81f
Branch: refs/heads/master
Commit: 667ee81f15555a51d0b2400e557dcfbf8e04fbe4
Parents: d062ab4
Author: QiangCai <qi...@qq.com>
Authored: Tue Nov 14 15:42:20 2017 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Wed Nov 15 00:00:39 2017 +0800
----------------------------------------------------------------------
.../org/apache/carbondata/examples/StreamExample.scala | 12 ++++++++----
.../streaming/CarbonAppendableStreamSink.scala | 13 ++++++++++++-
2 files changed, 20 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/667ee81f/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamExample.scala
index c31a0aa..4b59aad 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamExample.scala
@@ -71,7 +71,8 @@ object StreamExample {
| file struct<school:array<string>, age:int>
| )
| STORED BY 'carbondata'
- | TBLPROPERTIES('sort_columns'='name', 'dictionary_include'='city')
+ | TBLPROPERTIES(
+ | 'streaming'='true', 'sort_columns'='name', 'dictionary_include'='city')
| """.stripMargin)
} else {
spark.sql(
@@ -83,6 +84,8 @@ object StreamExample {
| salary FLOAT
| )
| STORED BY 'carbondata'
+ | TBLPROPERTIES(
+ | 'streaming'='true', 'sort_columns'='name')
| """.stripMargin)
}
@@ -173,7 +176,8 @@ object StreamExample {
qry.awaitTermination()
} catch {
- case _: InterruptedException =>
+ case ex =>
+ ex.printStackTrace()
println("Done reading and writing streaming data")
} finally {
qry.stop()
@@ -193,14 +197,14 @@ object StreamExample {
var index = 0
for (_ <- 1 to 1000) {
// write 5 records per iteration
- for (_ <- 0 to 100) {
+ for (_ <- 0 to 1000) {
index = index + 1
socketWriter.println(index.toString + ",name_" + index
+ ",city_" + index + "," + (index * 10000.00).toString +
",school_" + index + ":school_" + index + index + "$" + index)
}
socketWriter.flush()
- Thread.sleep(2000)
+ Thread.sleep(1000)
}
socketWriter.close()
System.out.println("Socket closed")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/667ee81f/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
index b3f0964..1281129 100644
--- a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
+++ b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
@@ -20,8 +20,9 @@ package org.apache.spark.sql.execution.streaming
import java.util.Date
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext, TaskAttemptID, TaskID, TaskType}
+import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.hadoop.mapreduce.v2.api.records.JobId
import org.apache.spark.{SparkHadoopWriter, TaskContext}
import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
@@ -35,6 +36,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.dictionary.server.DictionaryServer
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.stats.QueryStatistic
import org.apache.carbondata.core.util.path.CarbonStorePath
import org.apache.carbondata.hadoop.streaming.CarbonStreamOutputFormat
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
@@ -77,6 +79,9 @@ class CarbonAppendableStreamSink(
if (batchId <= fileLog.getLatest().map(_._1).getOrElse(-1L)) {
CarbonAppendableStreamSink.LOGGER.info(s"Skipping already committed batch $batchId")
} else {
+
+ val statistic = new QueryStatistic()
+
checkOrHandOffSegment()
// committer will record how this spark job commit its output
@@ -102,6 +107,10 @@ class CarbonAppendableStreamSink(
committer,
hadoopConf,
server)
+
+ statistic.addStatistics(s"add batch: $batchId", System.currentTimeMillis())
+ CarbonAppendableStreamSink.LOGGER.info(
+ s"${statistic.getMessage}, taken time(ms): ${statistic.getTimeTaken}")
}
}
@@ -158,6 +167,8 @@ object CarbonAppendableStreamSink {
val job = Job.getInstance(hadoopConf)
job.setOutputKeyClass(classOf[Void])
job.setOutputValueClass(classOf[InternalRow])
+ val jobId = SparkHadoopWriter.createJobID(new Date, batchId.toInt)
+ job.setJobID(jobId)
val description = WriteDataFileJobDescription(
serializableHadoopConf = new SerializableConfiguration(job.getConfiguration),