You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ch...@apache.org on 2017/08/06 04:17:41 UTC
carbondata git commit: [CARBONDATA-1361] Reduced the SDV cluster time
to 22 minutes
Repository: carbondata
Updated Branches:
refs/heads/master a5e7b41b5 -> 8c17ceead
[CARBONDATA-1361] Reduced the SDV cluster time to 22 minutes
This closes #1225
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/8c17ceea
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/8c17ceea
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/8c17ceea
Branch: refs/heads/master
Commit: 8c17ceeadffbb8525abc5dd5ea2435ec1bd1c864
Parents: a5e7b41
Author: Ravindra Pesala <ra...@gmail.com>
Authored: Fri Aug 4 19:22:09 2017 +0530
Committer: chenliang613 <ch...@apache.org>
Committed: Sun Aug 6 12:17:24 2017 +0800
----------------------------------------------------------------------
assembly/pom.xml | 8 +--
.../carbondata/core/util/SessionParams.java | 21 +++++-
integration/spark-common-cluster-test/pom.xml | 4 +-
.../cluster/sdv/suite/SDVSuites.scala | 73 ++++++++++++--------
.../apache/carbondata/spark/rdd/CarbonRDD.scala | 15 ++--
.../sql/test/ResourceRegisterAndCopier.scala | 52 +++++++++++---
.../spark/sql/test/TestQueryExecutor.scala | 1 +
.../sql/test/Spark2TestQueryExecutor.scala | 9 +--
8 files changed, 128 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c17ceea/assembly/pom.xml
----------------------------------------------------------------------
diff --git a/assembly/pom.xml b/assembly/pom.xml
index ee9e460..f4d4f70 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -138,10 +138,6 @@
</profile>
<profile>
<id>spark-1.6</id>
- <!-- default -->
- <activation>
- <activeByDefault>true</activeByDefault>
- </activation>
<dependencies>
<dependency>
<groupId>org.apache.carbondata</groupId>
@@ -152,6 +148,10 @@
</profile>
<profile>
<id>spark-2.1</id>
+ <!-- default -->
+ <activation>
+ <activeByDefault>true</activeByDefault>
+ </activation>
<dependencies>
<dependency>
<groupId>org.apache.carbondata</groupId>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c17ceea/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
index f06ba01..6d8c900 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
@@ -29,7 +29,15 @@ import org.apache.carbondata.core.exception.InvalidConfigurationException;
import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION;
import static org.apache.carbondata.core.constants.CarbonCommonConstants.ENABLE_UNSAFE_SORT;
-import static org.apache.carbondata.core.constants.CarbonLoadOptionConstants.*;
+import static org.apache.carbondata.core.constants.CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION;
+import static org.apache.carbondata.core.constants.CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE;
+import static org.apache.carbondata.core.constants.CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH;
+import static org.apache.carbondata.core.constants.CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB;
+import static org.apache.carbondata.core.constants.CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT;
+import static org.apache.carbondata.core.constants.CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS;
+import static org.apache.carbondata.core.constants.CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD;
+import static org.apache.carbondata.core.constants.CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS;
+import static org.apache.carbondata.core.constants.CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE;
/**
* This class maintains carbon session params
@@ -40,9 +48,11 @@ public class SessionParams implements Serializable {
LogServiceFactory.getLogService(CacheProvider.class.getName());
private Map<String, String> sProps;
+ private Map<String, String> addedProps;
public SessionParams() {
sProps = new HashMap<>();
+ addedProps = new HashMap<>();
}
/**
@@ -70,6 +80,15 @@ public class SessionParams implements Serializable {
return this;
}
+ public SessionParams addProps(Map<String, String> addedProps) {
+ this.addedProps.putAll(addedProps);
+ return this;
+ }
+
+ public Map<String, String> getAddedProps() {
+ return addedProps;
+ }
+
/**
* validate the key value to be set using set command
* @param key
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c17ceea/integration/spark-common-cluster-test/pom.xml
----------------------------------------------------------------------
diff --git a/integration/spark-common-cluster-test/pom.xml b/integration/spark-common-cluster-test/pom.xml
index 0081e4e..e6ae541 100644
--- a/integration/spark-common-cluster-test/pom.xml
+++ b/integration/spark-common-cluster-test/pom.xml
@@ -124,7 +124,7 @@
<include>**/*Suite.java</include>
</includes>
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
- <argLine>-Xmx4g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m</argLine>
+ <argLine>-Xmx6g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m</argLine>
<systemProperties>
<java.awt.headless>true</java.awt.headless>
</systemProperties>
@@ -141,7 +141,7 @@
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
<junitxml>.</junitxml>
<filereports>CarbonTestSuite.txt</filereports>
- <argLine>-ea -Xmx6g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m
+ <argLine>-ea -Xmx5g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m
</argLine>
<stderr />
<environmentVariables>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c17ceea/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/suite/SDVSuites.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/suite/SDVSuites.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/suite/SDVSuites.scala
index de946a0..b9908ea 100644
--- a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/suite/SDVSuites.scala
+++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/suite/SDVSuites.scala
@@ -70,21 +70,13 @@ class SDVSuites extends Suites with BeforeAndAfterAll {
*/
class SDVSuites1 extends Suites with BeforeAndAfterAll {
- val suites = new AlterTableTestCase ::
- new BadRecordTestCase ::
- new BatchSortLoad1TestCase ::
- new BatchSortLoad2TestCase ::
- new BatchSortQueryTestCase ::
- new ColumndictTestCase ::
- new DataLoadingTestCase ::
- new DataLoadingV3TestCase ::
- new InvertedindexTestCase ::
- new OffheapQuery1TestCase ::
- new OffheapQuery2TestCase ::
- new OffheapSort1TestCase ::
- new OffheapSort2TestCase ::
- new PartitionTestCase ::
- new QueriesBasicTestCase :: Nil
+ val suites = new BadRecordTestCase ::
+ new BatchSortLoad1TestCase ::
+ new BatchSortQueryTestCase ::
+ new DataLoadingTestCase ::
+ new OffheapSort2TestCase ::
+ new PartitionTestCase ::
+ new QueriesBasicTestCase :: Nil
override val nestedSuites = suites.toIndexedSeq
@@ -100,20 +92,43 @@ class SDVSuites1 extends Suites with BeforeAndAfterAll {
*/
class SDVSuites2 extends Suites with BeforeAndAfterAll {
- val suites = new QueriesBVATestCase ::
- new QueriesCompactionTestCase ::
- new QueriesExcludeDictionaryTestCase ::
- new QueriesIncludeDictionaryTestCase ::
- new QueriesNormalTestCase ::
- new QueriesRangeFilterTestCase ::
- new QueriesSparkBlockDistTestCase ::
- new ShowLoadsTestCase ::
- new SinglepassTestCase ::
- new SortColumnTestCase ::
- new TimestamptypesTestCase ::
- new V3offheapvectorTestCase ::
- new Vector1TestCase ::
- new Vector2TestCase :: Nil
+ val suites = new QueriesBVATestCase ::
+ new QueriesCompactionTestCase ::
+ new QueriesExcludeDictionaryTestCase ::
+ new QueriesIncludeDictionaryTestCase :: Nil
+
+ override val nestedSuites = suites.toIndexedSeq
+
+ override protected def afterAll() = {
+ println("---------------- Stopping spark -----------------")
+ TestQueryExecutor.INSTANCE.stop()
+ println("---------------- Stopped spark -----------------")
+ }
+}
+
+/**
+ * Suite class for all tests.
+ */
+class SDVSuites3 extends Suites with BeforeAndAfterAll {
+
+ val suites = new AlterTableTestCase ::
+ new BatchSortLoad2TestCase ::
+ new InvertedindexTestCase ::
+ new OffheapQuery1TestCase ::
+ new OffheapQuery2TestCase ::
+ new OffheapSort1TestCase ::
+ new ShowLoadsTestCase ::
+ new SinglepassTestCase ::
+ new SortColumnTestCase ::
+ new TimestamptypesTestCase ::
+ new V3offheapvectorTestCase ::
+ new Vector1TestCase ::
+ new Vector2TestCase ::
+ new QueriesNormalTestCase ::
+ new ColumndictTestCase ::
+ new QueriesRangeFilterTestCase ::
+ new QueriesSparkBlockDistTestCase ::
+ new DataLoadingV3TestCase :: Nil
override val nestedSuites = suites.toIndexedSeq
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c17ceea/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
index b137d6d..a35c896 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
@@ -34,9 +34,15 @@ import org.apache.carbondata.core.util.{CarbonProperties, CarbonSessionInfo, Car
abstract class CarbonRDD[T: ClassTag](@transient sc: SparkContext,
@transient private var deps: Seq[Dependency[_]]) extends RDD[T](sc, deps) {
- val carbonSessionInfo: CarbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
-
-// val addedProperty = CarbonProperties.getInstance().getAddedProperty
+ val carbonSessionInfo: CarbonSessionInfo = {
+ var info = ThreadLocalSessionInfo.getCarbonSessionInfo
+ if (info == null || info.getSessionParams == null) {
+ info = new CarbonSessionInfo
+ info.setSessionParams(new SessionParams())
+ }
+ info.getSessionParams.addProps(CarbonProperties.getInstance().getAddedProperty)
+ info
+ }
/** Construct an RDD with just a one-to-one dependency on one parent */
def this(@transient oneParent: RDD[_]) =
@@ -50,7 +56,8 @@ abstract class CarbonRDD[T: ClassTag](@transient sc: SparkContext,
val carbonTaskInfo = new CarbonTaskInfo
carbonTaskInfo.setTaskId(System.nanoTime)
ThreadLocalTaskInfo.setCarbonTaskInfo(carbonTaskInfo)
-// addedProperty.asScala.map(f => CarbonProperties.getInstance().addProperty(f._1, f._2))
+ carbonSessionInfo.getSessionParams.getAddedProps.asScala.
+ map(f => CarbonProperties.getInstance().addProperty(f._1, f._2))
internalCompute(split, context)
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c17ceea/integration/spark-common/src/main/scala/org/apache/spark/sql/test/ResourceRegisterAndCopier.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/ResourceRegisterAndCopier.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/ResourceRegisterAndCopier.scala
index 6364630..b025c4c 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/ResourceRegisterAndCopier.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/ResourceRegisterAndCopier.scala
@@ -25,6 +25,7 @@ import org.apache.hadoop.io.IOUtils
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.locks.HdfsFileLock
import org.apache.carbondata.core.util.CarbonUtil
/**
@@ -46,19 +47,48 @@ object ResourceRegisterAndCopier {
if (!file.exists()) {
sys.error(s"""Provided path $hdfsPath does not exist""")
}
- val resources = readDataFiles(dataFilesPath)
- resources.foreach { file =>
- val hdfsDataPath = hdfsPath + "/" + file
- val rsFile = FileFactory.getCarbonFile(hdfsDataPath, fileType)
- if (!rsFile.exists()) {
- val target = resourcePath + "/" + file
- new File(resourcePath + "/" + file.substring(0, file.lastIndexOf("/"))).mkdirs()
- downloadFile(link, file, target)
- // copy it
- copyLocalFile(hdfsDataPath, target)
- new File(target).delete()
+ val lock = new HdfsFileLock("", "resource.lock")
+ var bool = false
+ try {
+ bool = lockWithRetries(lock)
+ if (bool) {
+ val resources = readDataFiles(dataFilesPath)
+ resources.foreach { file =>
+ val hdfsDataPath = hdfsPath + "/" + file
+ val rsFile = FileFactory.getCarbonFile(hdfsDataPath, fileType)
+ if (!rsFile.exists()) {
+ val target = resourcePath + "/" + file
+ new File(resourcePath + "/" + file.substring(0, file.lastIndexOf("/"))).mkdirs()
+ downloadFile(link, file, target)
+ // copy it
+ copyLocalFile(hdfsDataPath, target)
+ new File(target).delete()
+ }
+ }
}
+ } finally {
+ if (bool) {
+ lock.unlock()
+ }
+ }
+ }
+
+ def lockWithRetries(lock: HdfsFileLock): Boolean = {
+ try {
+ var i = 0
+ while (i < 10) {
+ if (lock.lock()) {
+ return true
+ } else {
+ Thread.sleep(30 * 1000L)
+ }
+ i += 1
+ }
+ } catch {
+ case e: InterruptedException =>
+ return false
}
+ false
}
def readDataFiles(dataFilesPath: String): Seq[String] = {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c17ceea/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala
index cf90912..6f177e6 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala
@@ -71,6 +71,7 @@ object TestQueryExecutor {
}
val resourcesPath = if (hdfsUrl.startsWith("hdfs://")) {
+ System.setProperty(CarbonCommonConstants.HDFS_TEMP_LOCATION, hdfsUrl)
ResourceRegisterAndCopier.
copyResourcesifNotExists(hdfsUrl, s"$integrationPath/spark-common-test/src/test/resources",
s"$integrationPath//spark-common-cluster-test/src/test/resources/testdatafileslist.txt")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c17ceea/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala
index 8aa25a4..2c163a2 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala
@@ -53,10 +53,11 @@ object Spark2TestQueryExecutor {
val conf = new SparkConf()
if (!TestQueryExecutor.masterUrl.startsWith("local")) {
conf.setJars(TestQueryExecutor.jars).
- set("spark.driver.memory", "4g").
- set("spark.executor.memory", "8g").
- set("spark.executor.cores", "4").
- set("spark.cores.max", "8")
+ set("spark.driver.memory", "6g").
+ set("spark.executor.memory", "4g").
+ set("spark.executor.cores", "2").
+ set("spark.executor.instances", "2").
+ set("spark.cores.max", "4")
FileFactory.getConfiguration.
set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER")
}