You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/06/14 17:38:50 UTC
carbondata git commit: [CARBONDATA-2599] Use RowStreamParserImp as
default value of config 'carbon.stream.parser'
Repository: carbondata
Updated Branches:
refs/heads/master efad40d57 -> f1163524f
[CARBONDATA-2599] Use RowStreamParserImp as default value of config 'carbon.stream.parser'
Parser 'RowStreamParserImpl' is used more often for real scene, so use 'RowStreamParserImpl' as default value of config 'carbon.stream.parser'
This closes #2370
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/f1163524
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/f1163524
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/f1163524
Branch: refs/heads/master
Commit: f1163524f5adab2dfeab992e17d6aac2b5bacf47
Parents: efad40d
Author: Zhang Zhichao <44...@qq.com>
Authored: Tue Jun 12 23:55:57 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Fri Jun 15 01:38:27 2018 +0800
----------------------------------------------------------------------
docs/streaming-guide.md | 11 ++++++-----
.../carbondata/examples/SparkStreamingExample.scala | 3 ---
.../examples/StreamingWithRowParserExample.scala | 3 ---
.../carbondata/examples/StructuredStreamingExample.scala | 3 +++
.../spark/carbondata/TestStreamingTableOperation.scala | 5 +++++
.../carbondata/TestStreamingTableWithRowParser.scala | 3 ---
.../carbondata/streaming/parser/CarbonStreamParser.java | 4 +++-
7 files changed, 17 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f1163524/docs/streaming-guide.md
----------------------------------------------------------------------
diff --git a/docs/streaming-guide.md b/docs/streaming-guide.md
index a9b174f..a9284e6 100644
--- a/docs/streaming-guide.md
+++ b/docs/streaming-guide.md
@@ -28,6 +28,7 @@ Start spark-shell in new terminal, type :paste, then copy and run the following
import org.apache.spark.sql.CarbonSession._
import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}
import org.apache.carbondata.core.util.path.CarbonTablePath
+ import org.apache.carbondata.streaming.parser.CarbonStreamParser
val warehouse = new File("./warehouse").getCanonicalPath
val metastore = new File("./metastore").getCanonicalPath
@@ -71,6 +72,8 @@ Start spark-shell in new terminal, type :paste, then copy and run the following
.option("checkpointLocation", CarbonTablePath.getStreamingCheckpointDir(tablePath))
.option("dbName", "default")
.option("tableName", "carbon_table")
+ .option(CarbonStreamParser.CARBON_STREAM_PARSER,
+ CarbonStreamParser.CARBON_STREAM_PARSER_CSV)
.start()
// start new thread to show data
@@ -157,13 +160,13 @@ Config the property "carbon.stream.parser" to define a stream parser to convert
property name | default | description
--- | --- | ---
-carbon.stream.parser | org.apache.carbondata.streaming.parser.CSVStreamParserImp | the class of the stream parser
+carbon.stream.parser | org.apache.carbondata.streaming.parser.RowStreamParserImp | the class of the stream parser
Currently CarbonData support two parsers, as following:
-**1. org.apache.carbondata.streaming.parser.CSVStreamParserImp**: This is the default stream parser, it gets a line data(String type) from the first index of InternalRow and converts this String to Object[].
+**1. org.apache.carbondata.streaming.parser.CSVStreamParserImp**: This parser gets a line data(String type) from the first index of InternalRow and converts this String to Object[].
-**2. org.apache.carbondata.streaming.parser.RowStreamParserImp**: This stream parser will auto convert InternalRow to Object[] according to schema of this `DataSet`, for example:
+**2. org.apache.carbondata.streaming.parser.RowStreamParserImp**: This is the default stream parser, it will auto convert InternalRow to Object[] according to schema of this `DataSet`, for example:
```scala
case class FileElement(school: Array[String], age: Int)
@@ -191,8 +194,6 @@ Currently CarbonData support two parsers, as following:
.option("checkpointLocation", tablePath.getStreamingCheckpointDir)
.option("dbName", "default")
.option("tableName", "carbon_table")
- .option(CarbonStreamParser.CARBON_STREAM_PARSER,
- CarbonStreamParser.CARBON_STREAM_PARSER_ROW_PARSER)
.start()
...
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f1163524/examples/spark2/src/main/scala/org/apache/carbondata/examples/SparkStreamingExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/SparkStreamingExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/SparkStreamingExample.scala
index 27ea893..beaeee1 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/SparkStreamingExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/SparkStreamingExample.scala
@@ -30,7 +30,6 @@ import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.examples.util.ExampleUtils
import org.apache.carbondata.streaming.CarbonSparkStreamingListener
-import org.apache.carbondata.streaming.parser.CarbonStreamParser
/**
* This example introduces how to use Spark Streaming to write data
@@ -172,8 +171,6 @@ object SparkStreamingExample {
" at batch time: " + time.toString() +
" the count of received data: " + df.count())
CarbonSparkStreamingFactory.getStreamSparkStreamingWriter(spark, "default", tableName)
- .option(CarbonStreamParser.CARBON_STREAM_PARSER,
- CarbonStreamParser.CARBON_STREAM_PARSER_ROW_PARSER)
.mode(SaveMode.Append)
.writeStreamData(df, time)
}}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f1163524/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamingWithRowParserExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamingWithRowParserExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamingWithRowParserExample.scala
index 109629e..ceb3d0f 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamingWithRowParserExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamingWithRowParserExample.scala
@@ -25,7 +25,6 @@ import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.examples.util.ExampleUtils
-import org.apache.carbondata.streaming.parser.CarbonStreamParser
case class FileElement(school: Array[String], age: Int)
case class StreamData(id: Int, name: String, city: String, salary: Float, file: FileElement)
@@ -170,8 +169,6 @@ object StreamingWithRowParserExample {
.option("checkpointLocation", CarbonTablePath.getStreamingCheckpointDir(tablePath))
.option("dbName", "default")
.option("tableName", "stream_table_with_row_parser")
- .option(CarbonStreamParser.CARBON_STREAM_PARSER,
- CarbonStreamParser.CARBON_STREAM_PARSER_ROW_PARSER)
.start()
qry.awaitTermination()
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f1163524/examples/spark2/src/main/scala/org/apache/carbondata/examples/StructuredStreamingExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StructuredStreamingExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StructuredStreamingExample.scala
index 38a1941..f88d8ee 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StructuredStreamingExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StructuredStreamingExample.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.examples.util.ExampleUtils
+import org.apache.carbondata.streaming.parser.CarbonStreamParser
// scalastyle:off println
object StructuredStreamingExample {
@@ -156,6 +157,8 @@ object StructuredStreamingExample {
CarbonTablePath.getStreamingCheckpointDir(carbonTable.getTablePath))
.option("dbName", "default")
.option("tableName", "stream_table")
+ .option(CarbonStreamParser.CARBON_STREAM_PARSER,
+ CarbonStreamParser.CARBON_STREAM_PARSER_CSV)
.start()
qry.awaitTermination()
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f1163524/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
index 325722d..3253c3d 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
@@ -40,6 +40,7 @@ import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentStatus}
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.spark.exception.ProcessMetaDataException
+import org.apache.carbondata.streaming.parser.CarbonStreamParser
class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
@@ -1713,6 +1714,8 @@ sql("drop table if exists streaming.bad_record_ignore")
.option("BAD_RECORD_PATH", badRecordsPath)
.option("dbName", tableIdentifier.database.get)
.option("tableName", tableIdentifier.table)
+ .option(CarbonStreamParser.CARBON_STREAM_PARSER,
+ CarbonStreamParser.CARBON_STREAM_PARSER_CSV)
.option(CarbonCommonConstants.HANDOFF_SIZE, handoffSize)
.option("timestampformat", CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
.option(CarbonCommonConstants.ENABLE_AUTO_HANDOFF, autoHandoff)
@@ -1830,6 +1833,8 @@ sql("drop table if exists streaming.bad_record_ignore")
.option("dbName", tableIdentifier.database.get)
.option("tableName", tableIdentifier.table)
.option("timestampformat", CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+ .option(CarbonStreamParser.CARBON_STREAM_PARSER,
+ CarbonStreamParser.CARBON_STREAM_PARSER_CSV)
.start()
qry.awaitTermination()
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f1163524/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
index a6b0fec..39d63bf 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
@@ -35,7 +35,6 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentStatus}
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.streaming.parser.CarbonStreamParser
case class FileElement(school: Array[String], age: Integer)
case class StreamData(id: Integer, name: String, city: String, salary: java.lang.Float,
@@ -783,8 +782,6 @@ class TestStreamingTableWithRowParser extends QueryTest with BeforeAndAfterAll {
.option(CarbonCommonConstants.HANDOFF_SIZE, handoffSize)
.option("timestampformat", CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
.option(CarbonCommonConstants.ENABLE_AUTO_HANDOFF, autoHandoff)
- .option(CarbonStreamParser.CARBON_STREAM_PARSER,
- CarbonStreamParser.CARBON_STREAM_PARSER_ROW_PARSER)
.start()
qry.awaitTermination()
} catch {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f1163524/streaming/src/main/java/org/apache/carbondata/streaming/parser/CarbonStreamParser.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/parser/CarbonStreamParser.java b/streaming/src/main/java/org/apache/carbondata/streaming/parser/CarbonStreamParser.java
index e335626..94f0307 100644
--- a/streaming/src/main/java/org/apache/carbondata/streaming/parser/CarbonStreamParser.java
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/parser/CarbonStreamParser.java
@@ -28,12 +28,14 @@ public interface CarbonStreamParser {
String CARBON_STREAM_PARSER = "carbon.stream.parser";
- String CARBON_STREAM_PARSER_DEFAULT =
+ String CARBON_STREAM_PARSER_CSV =
"org.apache.carbondata.streaming.parser.CSVStreamParserImp";
String CARBON_STREAM_PARSER_ROW_PARSER =
"org.apache.carbondata.streaming.parser.RowStreamParserImp";
+ String CARBON_STREAM_PARSER_DEFAULT = CARBON_STREAM_PARSER_ROW_PARSER;
+
void initialize(Configuration configuration, StructType structType);
Object[] parserRow(InternalRow value);