You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/05/03 01:52:23 UTC

carbondata git commit: [CARBONDATA-2379] Support SearchModeExample run in cluster

Repository: carbondata
Updated Branches:
  refs/heads/master 4b8dc0a58 -> 46cee146d


[CARBONDATA-2379] Support SearchModeExample run in cluster

1.support SeachModeExample running in the cluster
2.change the worker hostname to hostAddress
3. support run ConcurrentQueryBenchmark with search mode
4. remove read.close, which maybe lead to JVM crash

This closes #2173


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/46cee146
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/46cee146
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/46cee146

Branch: refs/heads/master
Commit: 46cee146db36617ea20c7bdaa2415e9bde02f63d
Parents: 4b8dc0a
Author: xubo245 <60...@qq.com>
Authored: Mon Apr 23 10:39:59 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Thu May 3 09:52:09 2018 +0800

----------------------------------------------------------------------
 .../benchmark/ConcurrentQueryBenchmark.scala    | 78 ++++++++++++++------
 .../benchmark/SimpleQueryBenchmark.scala        |  6 +-
 .../carbondata/examples/SearchModeExample.scala | 47 +++++++++---
 .../scala/org/apache/spark/rpc/Master.scala     |  2 +-
 4 files changed, 98 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/46cee146/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/ConcurrentQueryBenchmark.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/ConcurrentQueryBenchmark.scala b/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/ConcurrentQueryBenchmark.scala
index a1a1428..697d13f 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/ConcurrentQueryBenchmark.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/ConcurrentQueryBenchmark.scala
@@ -54,7 +54,8 @@ import org.apache.carbondata.spark.util.DataGenerator
  * --executor-memory 24g \
  * --num-executors 3  \
  * concurrencyTest.jar \
- * totalNum threadNum taskNum resultIsEmpty runInLocal generateFile deleteFile
+ * totalNum threadNum taskNum resultIsEmpty runInLocal generateFile
+ * deleteFile openSearchMode storeLocation
  * details in initParameters method of this benchmark
  */
 object ConcurrentQueryBenchmark {
@@ -75,6 +76,10 @@ object ConcurrentQueryBenchmark {
   var generateFile = true
   // whether delete file
   var deleteFile = true
+  // open search mode, default value is false
+  var openSearchMode = false
+  // carbon store location
+  var storeLocation = "/tmp"
 
   val cardinalityId = 100 * 1000 * 1000
   val cardinalityCity = 6
@@ -234,23 +239,24 @@ object ConcurrentQueryBenchmark {
     } else {
       null
     }
-
-    val table1Time = time {
-      if (table1.endsWith("parquet")) {
-        if (generateFile) {
-          generateParquetTable(spark, df, table1)
-        }
-        spark.read.parquet(table1).createOrReplaceTempView(table1)
-      } else if (table1.endsWith("orc")) {
-        if (generateFile) {
-          generateOrcTable(spark, df, table1)
-          spark.read.orc(table1).createOrReplaceTempView(table1)
+    if (!openSearchMode) {
+      val table1Time = time {
+        if (table1.endsWith("parquet")) {
+          if (generateFile) {
+            generateParquetTable(spark, df, storeLocation + "/" + table1)
+          }
+          spark.read.parquet(storeLocation + "/" + table1).createOrReplaceTempView(table1)
+        } else if (table1.endsWith("orc")) {
+          if (generateFile) {
+            generateOrcTable(spark, df, table1)
+            spark.read.orc(table1).createOrReplaceTempView(table1)
+          }
+        } else {
+          sys.error("invalid table: " + table1)
         }
-      } else {
-        sys.error("invalid table: " + table1)
       }
+      println(s"$table1 completed, time: $table1Time sec")
     }
-    println(s"$table1 completed, time: $table1Time sec")
 
     val table2Time: Double = if (generateFile) {
       generateCarbonTable(spark, df, table2)
@@ -417,13 +423,26 @@ object ConcurrentQueryBenchmark {
    */
   def runTest(spark: SparkSession, table1: String, table2: String): Unit = {
     // run queries on parquet and carbon
-    runQueries(spark, table1)
+    if (!openSearchMode) {
+      runQueries(spark, table1)
+    }
     // do GC and sleep for some time before running next table
     System.gc()
     Thread.sleep(1000)
     System.gc()
     Thread.sleep(1000)
     runQueries(spark, table2)
+    if (openSearchMode) {
+      runQueries(spark, table2)
+      // start search mode (start all gRPC server)
+      // following queries will be run using gRPC
+      spark.asInstanceOf[CarbonSession].startSearchMode()
+      println("Open search mode:")
+      runQueries(spark, table2)
+      runQueries(spark, table2)
+      // stop gRPC servers
+      spark.asInstanceOf[CarbonSession].stopSearchMode()
+    }
   }
 
   /**
@@ -468,6 +487,9 @@ object ConcurrentQueryBenchmark {
     }
     if (arr.length > 5) {
       runInLocal = if (arr(5).equalsIgnoreCase("true")) {
+        val rootPath = new File(this.getClass.getResource("/").getPath
+          + "../../../..").getCanonicalPath
+        storeLocation = s"$rootPath/examples/spark2/target/store"
         true
       } else if (arr(5).equalsIgnoreCase("false")) {
         false
@@ -493,6 +515,18 @@ object ConcurrentQueryBenchmark {
         throw new Exception("error parameter, should be true or false")
       }
     }
+    if (arr.length > 8) {
+      openSearchMode = if (arr(8).equalsIgnoreCase("true")) {
+        true
+      } else if (arr(8).equalsIgnoreCase("false")) {
+        false
+      } else {
+        throw new Exception("error parameter, should be true or false")
+      }
+    }
+    if (arr.length > 9) {
+      storeLocation = arr(9)
+    }
   }
 
   /**
@@ -520,12 +554,11 @@ object ConcurrentQueryBenchmark {
       "\tfile path: " + path +
       "\trunInLocal: " + runInLocal +
       "\tgenerateFile: " + generateFile +
-      "\tdeleteFile: " + deleteFile
+      "\tdeleteFile: " + deleteFile +
+      "\topenSearchMode: " + openSearchMode +
+      "\tstoreLocation: " + storeLocation
 
     val spark = if (runInLocal) {
-      val rootPath = new File(this.getClass.getResource("/").getPath
-        + "../../../..").getCanonicalPath
-      val storeLocation = s"$rootPath/examples/spark2/target/store"
       SparkSession
         .builder()
         .appName(parameters)
@@ -537,10 +570,9 @@ object ConcurrentQueryBenchmark {
         .builder()
         .appName(parameters)
         .enableHiveSupport()
-        .getOrCreateCarbonSession()
+        .getOrCreateCarbonSession(storeLocation)
     }
     spark.sparkContext.setLogLevel("ERROR")
-
     println("\nEnvironment information:")
     val env = Array(
       "spark.master",
@@ -560,10 +592,8 @@ object ConcurrentQueryBenchmark {
     // 2. prepareTable
     prepareTable(spark, table1, table2)
 
-    spark.asInstanceOf[CarbonSession].startSearchMode()
     // 3. runTest
     runTest(spark, table1, table2)
-    spark.asInstanceOf[CarbonSession].stopSearchMode()
 
     if (deleteFile) {
       CarbonUtil.deleteFoldersAndFiles(new File(table1))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/46cee146/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/SimpleQueryBenchmark.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/SimpleQueryBenchmark.scala b/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/SimpleQueryBenchmark.scala
index e9c880b..ce69c66 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/SimpleQueryBenchmark.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/SimpleQueryBenchmark.scala
@@ -314,9 +314,13 @@ object SimpleQueryBenchmark {
     val rootPath = new File(this.getClass.getResource("/").getPath
         + "../../../..").getCanonicalPath
     val storeLocation = s"$rootPath/examples/spark2/target/store"
+    val master = Option(System.getProperty("spark.master"))
+      .orElse(sys.env.get("MASTER"))
+      .orElse(Option("local[8]"))
+
     val spark = SparkSession
         .builder()
-        .master("local")
+        .master(master.get)
         .enableHiveSupport()
         .config("spark.driver.host", "127.0.0.1")
         .getOrCreateCarbonSession(storeLocation)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/46cee146/examples/spark2/src/main/scala/org/apache/carbondata/examples/SearchModeExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/SearchModeExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/SearchModeExample.scala
index 03e724f..aeb4c29 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/SearchModeExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/SearchModeExample.scala
@@ -22,7 +22,8 @@ import java.util.concurrent.{Executors, ExecutorService}
 
 import org.apache.spark.sql.{CarbonSession, SparkSession}
 
-import org.apache.carbondata.examples.util.ExampleUtils
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
 
 /**
  * An example that demonstrate how to run queries in search mode,
@@ -32,16 +33,46 @@ import org.apache.carbondata.examples.util.ExampleUtils
 object SearchModeExample {
 
   def main(args: Array[String]) {
-    val spark = ExampleUtils.createCarbonSession("SearchModeExample")
+    import org.apache.spark.sql.CarbonSession._
+    val master = Option(System.getProperty("spark.master"))
+      .orElse(sys.env.get("MASTER"))
+      .orElse(Option("local[8]"))
+
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd HH:mm:ss")
+      .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "yyyy/MM/dd")
+      .addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE, "true")
+      .addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, "")
+
+    val filePath = if (args.length > 0) {
+      args(0)
+    } else {
+      val rootPath = new File(this.getClass.getResource("/").getPath
+        + "../../../..").getCanonicalPath
+      s"$rootPath/examples/spark2/src/main/resources/data.csv"
+    }
+    val storePath = if (args.length > 1) {
+      args(1)
+    } else {
+      val rootPath = new File(this.getClass.getResource("/").getPath
+        + "../../../..").getCanonicalPath
+      s"$rootPath/examples/spark2/target/store"
+    }
+
+    val spark = SparkSession
+      .builder()
+      .master(master.get)
+      .appName("SearchModeExample")
+      .config("spark.sql.crossJoin.enabled", "true")
+      .getOrCreateCarbonSession(storePath)
+
     spark.sparkContext.setLogLevel("ERROR")
-    exampleBody(spark)
+    exampleBody(spark, filePath)
+    println("Finished!")
     spark.close()
   }
 
-  def exampleBody(spark : SparkSession): Unit = {
-
-    val rootPath = new File(this.getClass.getResource("/").getPath
-                            + "../../../..").getCanonicalPath
+  def exampleBody(spark: SparkSession, path: String): Unit = {
 
     spark.sql("DROP TABLE IF EXISTS carbonsession_table")
 
@@ -64,8 +95,6 @@ object SearchModeExample {
          | TBLPROPERTIES('DICTIONARY_INCLUDE'='dateField, charField')
        """.stripMargin)
 
-    val path = s"$rootPath/examples/spark2/src/main/resources/data.csv"
-
     spark.sql(
       s"""
          | LOAD DATA LOCAL INPATH '$path'

http://git-wip-us.apache.org/repos/asf/carbondata/blob/46cee146/store/search/src/main/scala/org/apache/spark/rpc/Master.scala
----------------------------------------------------------------------
diff --git a/store/search/src/main/scala/org/apache/spark/rpc/Master.scala b/store/search/src/main/scala/org/apache/spark/rpc/Master.scala
index bc44fb6..e98a780 100644
--- a/store/search/src/main/scala/org/apache/spark/rpc/Master.scala
+++ b/store/search/src/main/scala/org/apache/spark/rpc/Master.scala
@@ -210,7 +210,7 @@ class Master(sparkConf: SparkConf, port: Int) {
 
   /**
    * Prune data by using CarbonInputFormat.getSplit
-   * Return a mapping of hostname to list of block
+   * Return a mapping of host address to list of block
    */
   private def pruneBlock(
       table: CarbonTable,