You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/06/22 01:34:26 UTC

[25/50] [abbrv] carbondata git commit: [CARBONDATA-2599] Use RowStreamParserImp as default value of config 'carbon.stream.parser'

[CARBONDATA-2599] Use RowStreamParserImp as default value of config 'carbon.stream.parser'

Parser 'RowStreamParserImpl' is used more often for real scene, so use 'RowStreamParserImpl' as default value of config 'carbon.stream.parser'

This closes #2370


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/f1163524
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/f1163524
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/f1163524

Branch: refs/heads/carbonstore
Commit: f1163524f5adab2dfeab992e17d6aac2b5bacf47
Parents: efad40d
Author: Zhang Zhichao <44...@qq.com>
Authored: Tue Jun 12 23:55:57 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Fri Jun 15 01:38:27 2018 +0800

----------------------------------------------------------------------
 docs/streaming-guide.md                                  | 11 ++++++-----
 .../carbondata/examples/SparkStreamingExample.scala      |  3 ---
 .../examples/StreamingWithRowParserExample.scala         |  3 ---
 .../carbondata/examples/StructuredStreamingExample.scala |  3 +++
 .../spark/carbondata/TestStreamingTableOperation.scala   |  5 +++++
 .../carbondata/TestStreamingTableWithRowParser.scala     |  3 ---
 .../carbondata/streaming/parser/CarbonStreamParser.java  |  4 +++-
 7 files changed, 17 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/f1163524/docs/streaming-guide.md
----------------------------------------------------------------------
diff --git a/docs/streaming-guide.md b/docs/streaming-guide.md
index a9b174f..a9284e6 100644
--- a/docs/streaming-guide.md
+++ b/docs/streaming-guide.md
@@ -28,6 +28,7 @@ Start spark-shell in new terminal, type :paste, then copy and run the following
  import org.apache.spark.sql.CarbonSession._
  import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}
  import org.apache.carbondata.core.util.path.CarbonTablePath
+ import org.apache.carbondata.streaming.parser.CarbonStreamParser
 
  val warehouse = new File("./warehouse").getCanonicalPath
  val metastore = new File("./metastore").getCanonicalPath
@@ -71,6 +72,8 @@ Start spark-shell in new terminal, type :paste, then copy and run the following
    .option("checkpointLocation", CarbonTablePath.getStreamingCheckpointDir(tablePath))
    .option("dbName", "default")
    .option("tableName", "carbon_table")
+   .option(CarbonStreamParser.CARBON_STREAM_PARSER,
+     CarbonStreamParser.CARBON_STREAM_PARSER_CSV)
    .start()
 
  // start new thread to show data
@@ -157,13 +160,13 @@ Config the property "carbon.stream.parser" to define a stream parser to convert
 
 property name | default | description
 --- | --- | ---
-carbon.stream.parser | org.apache.carbondata.streaming.parser.CSVStreamParserImp | the class of the stream parser
+carbon.stream.parser | org.apache.carbondata.streaming.parser.RowStreamParserImp | the class of the stream parser
 
 Currently CarbonData support two parsers, as following:
 
-**1. org.apache.carbondata.streaming.parser.CSVStreamParserImp**: This is the default stream parser, it gets a line data(String type) from the first index of InternalRow and converts this String to Object[].
+**1. org.apache.carbondata.streaming.parser.CSVStreamParserImp**: This parser gets a line data(String type) from the first index of InternalRow and converts this String to Object[].
 
-**2. org.apache.carbondata.streaming.parser.RowStreamParserImp**: This stream parser will auto convert InternalRow to Object[] according to schema of this `DataSet`, for example:
+**2. org.apache.carbondata.streaming.parser.RowStreamParserImp**: This is the default stream parser, it will auto convert InternalRow to Object[] according to schema of this `DataSet`, for example:
 
 ```scala
  case class FileElement(school: Array[String], age: Int)
@@ -191,8 +194,6 @@ Currently CarbonData support two parsers, as following:
    .option("checkpointLocation", tablePath.getStreamingCheckpointDir)
    .option("dbName", "default")
    .option("tableName", "carbon_table")
-   .option(CarbonStreamParser.CARBON_STREAM_PARSER,
-     CarbonStreamParser.CARBON_STREAM_PARSER_ROW_PARSER)
    .start()
 
  ...

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f1163524/examples/spark2/src/main/scala/org/apache/carbondata/examples/SparkStreamingExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/SparkStreamingExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/SparkStreamingExample.scala
index 27ea893..beaeee1 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/SparkStreamingExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/SparkStreamingExample.scala
@@ -30,7 +30,6 @@ import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.examples.util.ExampleUtils
 import org.apache.carbondata.streaming.CarbonSparkStreamingListener
-import org.apache.carbondata.streaming.parser.CarbonStreamParser
 
 /**
  * This example introduces how to use Spark Streaming to write data
@@ -172,8 +171,6 @@ object SparkStreamingExample {
           " at batch time: " + time.toString() +
           " the count of received data: " + df.count())
         CarbonSparkStreamingFactory.getStreamSparkStreamingWriter(spark, "default", tableName)
-          .option(CarbonStreamParser.CARBON_STREAM_PARSER,
-            CarbonStreamParser.CARBON_STREAM_PARSER_ROW_PARSER)
           .mode(SaveMode.Append)
           .writeStreamData(df, time)
       }}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f1163524/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamingWithRowParserExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamingWithRowParserExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamingWithRowParserExample.scala
index 109629e..ceb3d0f 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamingWithRowParserExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamingWithRowParserExample.scala
@@ -25,7 +25,6 @@ import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}
 
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.examples.util.ExampleUtils
-import org.apache.carbondata.streaming.parser.CarbonStreamParser
 
 case class FileElement(school: Array[String], age: Int)
 case class StreamData(id: Int, name: String, city: String, salary: Float, file: FileElement)
@@ -170,8 +169,6 @@ object StreamingWithRowParserExample {
             .option("checkpointLocation", CarbonTablePath.getStreamingCheckpointDir(tablePath))
             .option("dbName", "default")
             .option("tableName", "stream_table_with_row_parser")
-            .option(CarbonStreamParser.CARBON_STREAM_PARSER,
-              CarbonStreamParser.CARBON_STREAM_PARSER_ROW_PARSER)
             .start()
 
           qry.awaitTermination()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f1163524/examples/spark2/src/main/scala/org/apache/carbondata/examples/StructuredStreamingExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StructuredStreamingExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StructuredStreamingExample.scala
index 38a1941..f88d8ee 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StructuredStreamingExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StructuredStreamingExample.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.examples.util.ExampleUtils
+import org.apache.carbondata.streaming.parser.CarbonStreamParser
 
 // scalastyle:off println
 object StructuredStreamingExample {
@@ -156,6 +157,8 @@ object StructuredStreamingExample {
               CarbonTablePath.getStreamingCheckpointDir(carbonTable.getTablePath))
             .option("dbName", "default")
             .option("tableName", "stream_table")
+            .option(CarbonStreamParser.CARBON_STREAM_PARSER,
+              CarbonStreamParser.CARBON_STREAM_PARSER_CSV)
             .start()
 
           qry.awaitTermination()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f1163524/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
index 325722d..3253c3d 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
@@ -40,6 +40,7 @@ import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentStatus}
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.spark.exception.ProcessMetaDataException
+import org.apache.carbondata.streaming.parser.CarbonStreamParser
 
 class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
 
@@ -1713,6 +1714,8 @@ sql("drop table if exists streaming.bad_record_ignore")
             .option("BAD_RECORD_PATH", badRecordsPath)
             .option("dbName", tableIdentifier.database.get)
             .option("tableName", tableIdentifier.table)
+            .option(CarbonStreamParser.CARBON_STREAM_PARSER,
+              CarbonStreamParser.CARBON_STREAM_PARSER_CSV)
             .option(CarbonCommonConstants.HANDOFF_SIZE, handoffSize)
             .option("timestampformat", CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
             .option(CarbonCommonConstants.ENABLE_AUTO_HANDOFF, autoHandoff)
@@ -1830,6 +1833,8 @@ sql("drop table if exists streaming.bad_record_ignore")
             .option("dbName", tableIdentifier.database.get)
             .option("tableName", tableIdentifier.table)
             .option("timestampformat", CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+            .option(CarbonStreamParser.CARBON_STREAM_PARSER,
+              CarbonStreamParser.CARBON_STREAM_PARSER_CSV)
             .start()
 
           qry.awaitTermination()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f1163524/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
index a6b0fec..39d63bf 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
@@ -35,7 +35,6 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentStatus}
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.streaming.parser.CarbonStreamParser
 
 case class FileElement(school: Array[String], age: Integer)
 case class StreamData(id: Integer, name: String, city: String, salary: java.lang.Float,
@@ -783,8 +782,6 @@ class TestStreamingTableWithRowParser extends QueryTest with BeforeAndAfterAll {
             .option(CarbonCommonConstants.HANDOFF_SIZE, handoffSize)
             .option("timestampformat", CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
             .option(CarbonCommonConstants.ENABLE_AUTO_HANDOFF, autoHandoff)
-            .option(CarbonStreamParser.CARBON_STREAM_PARSER,
-              CarbonStreamParser.CARBON_STREAM_PARSER_ROW_PARSER)
             .start()
           qry.awaitTermination()
         } catch {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f1163524/streaming/src/main/java/org/apache/carbondata/streaming/parser/CarbonStreamParser.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/parser/CarbonStreamParser.java b/streaming/src/main/java/org/apache/carbondata/streaming/parser/CarbonStreamParser.java
index e335626..94f0307 100644
--- a/streaming/src/main/java/org/apache/carbondata/streaming/parser/CarbonStreamParser.java
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/parser/CarbonStreamParser.java
@@ -28,12 +28,14 @@ public interface CarbonStreamParser {
 
   String CARBON_STREAM_PARSER = "carbon.stream.parser";
 
-  String CARBON_STREAM_PARSER_DEFAULT =
+  String CARBON_STREAM_PARSER_CSV =
       "org.apache.carbondata.streaming.parser.CSVStreamParserImp";
 
   String CARBON_STREAM_PARSER_ROW_PARSER =
       "org.apache.carbondata.streaming.parser.RowStreamParserImp";
 
+  String CARBON_STREAM_PARSER_DEFAULT = CARBON_STREAM_PARSER_ROW_PARSER;
+
   void initialize(Configuration configuration, StructType structType);
 
   Object[] parserRow(InternalRow value);