You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@carbondata.apache.org by cenyuhai11 <ce...@163.com> on 2017/10/27 03:13:01 UTC

[DISCUSSION] Improve insert process

When I insert data into carbondata from one table, I should do as the
following:
1、select count(1) from table1
and then
2、insert into table table1 select * from table1

Why I should execute "select count(1) from table1" first?
because the number of tasks are compute by carbondata, it is releated to how
many executor hosts we have now!

I don't think it is the right way. We should let spark to control the number
of tasks.
set the parameter "mapred.max.splits.size" is a common way to adjust the
number of tasks.

Even when I do the step 2, some tasks still failed, it will increase the
insert time.

So I sugguest that don't adjust the number of tasks, just use the default
behavior of spark. 
And then if there are small files, add a fast merge job(merge data at
blocket level, just as )

so we also need to set the default value of
"carbon.number.of.cores.while.loading" to 1







--
Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/

回复: [DISCUSSION] Improve insert process

Posted by 岑玉海 <ce...@163.com>.
Hi, jacky


    Do you know my pain?


scala>  
     |  carbon.sql("insert overwrite table dm_test.dm_trd_wide_carbondata select * from dm.dm_trd_wide where dt = '2017-10-10' ").collect
17/10/30 12:19:27 AUDIT [org.apache.carbondata.spark.rdd.CarbonDataRDDFactory$(207) -- main]: [sh-hadoop-datanode-250-104.elenet.me][master][Thread-1]Data load request has been received for table dm_test.dm_trd_wide_carbondata
[Stage 0:>                                                        (0 + 85) / 85]17/10/30 12:19:39 ERROR [org.apache.carbondata.spark.rdd.CarbonDataRDDFactory$(143) -- main]: main load data frame failed
org.apache.spark.SparkException: Job 0 cancelled as part of cancellation of all jobs
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1441)
        at org.apache.spark.scheduler.DAGScheduler.handleJobCancellation(DAGScheduler.scala:1381)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$doCancelAllJobs$1.apply$mcVI$sp(DAGScheduler.scala:726)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$doCancelAllJobs$1.apply(DAGScheduler.scala:726)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$doCancelAllJobs$1.apply(DAGScheduler.scala:726)
        at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
        at org.apache.spark.scheduler.DAGScheduler.doCancelAllJobs(DAGScheduler.scala:726)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1634)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1600)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1972)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1985)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1998)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2012)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:944)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:370)
        at org.apache.spark.rdd.RDD.collect(RDD.scala:943)
        at org.apache.carbondata.spark.rdd.CarbonDataRDDFactory$.loadDataFrame$1(CarbonDataRDDFactory.scala:761)
        at org.apache.carbondata.spark.rdd.CarbonDataRDDFactory$.loadCarbonData(CarbonDataRDDFactory.scala:922)
        at org.apache.spark.sql.execution.command.LoadTable.processData(carbonTableSchema.scala:1046)
        at org.apache.spark.sql.execution.command.LoadTable.run(carbonTableSchema.scala:754)
        at org.apache.spark.sql.execution.command.LoadTableByInsert.processData(carbonTableSchema.scala:651)
        at org.apache.spark.sql.execution.command.LoadTableByInsert.run(carbonTableSchema.scala:637)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:67)
        at org.apache.spark.sql.Dataset.<init>(Dataset.scala:180)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:65)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:619)
        at $line18.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:37)
        at $line18.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:42)
        at $line18.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:44)
        at $line18.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:46)
        at $line18.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:48)
        at $line18.$read$$iw$$iw$$iw$$iw$$iw.<init>(<console>:50)
        at $line18.$read$$iw$$iw$$iw$$iw.<init>(<console>:52)
        at $line18.$read$$iw$$iw$$iw.<init>(<console>:54)
        at $line18.$read$$iw$$iw.<init>(<console>:56)
        at $line18.$read$$iw.<init>(<console>:58)
        at $line18.$read.<init>(<console>:60)
        at $line18.$read$.<init>(<console>:64)
        at $line18.$read$.<clinit>(<console>)
        at $line18.$eval$.$print$lzycompute(<console>:7)
        at $line18.$eval$.$print(<console>:6)
        at $line18.$eval.$print(<console>)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:786)
        at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1047)
        at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:638)
        at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:637)
        at scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31)
        at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:19)
        at scala.tools.nsc.interpreter.IMain$WrappedRequest.loadAndRunReq(IMain.scala:637)
        at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:569)
        at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:565)
        at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:807)
        at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:825)
        at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:681)
        at scala.tools.nsc.interpreter.ILoop.processLine(ILoop.scala:395)
        at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:415)
        at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:923)
        at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909)
        at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909)
        at scala.reflect.internal.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:97)
        at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:909)
        at org.apache.spark.repl.Main$.doMain(Main.scala:69)
        at org.apache.spark.repl.Main$.main(Main.scala:52)
        at org.apache.spark.repl.Main.main(Main.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:743)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:186)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:211)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
17/10/30 12:19:39 ERROR [org.apache.carbondata.spark.rdd.CarbonDataRDDFactory$(143) -- main]: main 
org.apache.spark.SparkException: Job 0 cancelled as part of cancellation of all jobs
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1441)
        at org.apache.spark.scheduler.DAGScheduler.handleJobCancellation(DAGScheduler.scala:1381)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$doCancelAllJobs$1.apply$mcVI$sp(DAGScheduler.scala:726)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$doCancelAllJobs$1.apply(DAGScheduler.scala:726)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$doCancelAllJobs$1.apply(DAGScheduler.scala:726)
        at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
        at org.apache.spark.scheduler.DAGScheduler.doCancelAllJobs(DAGScheduler.scala:726)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1634)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1600)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1972)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1985)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1998)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2012)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:944)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:370)
        at org.apache.spark.rdd.RDD.collect(RDD.scala:943)
        at org.apache.carbondata.spark.rdd.CarbonDataRDDFactory$.loadDataFrame$1(CarbonDataRDDFactory.scala:761)
        at org.apache.carbondata.spark.rdd.CarbonDataRDDFactory$.loadCarbonData(CarbonDataRDDFactory.scala:922)
        at org.apache.spark.sql.execution.command.LoadTable.processData(carbonTableSchema.scala:1046)
        at org.apache.spark.sql.execution.command.LoadTable.run(carbonTableSchema.scala:754)
        at org.apache.spark.sql.execution.command.LoadTableByInsert.processData(carbonTableSchema.scala:651)
        at org.apache.spark.sql.execution.command.LoadTableByInsert.run(carbonTableSchema.scala:637)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:67)
        at org.apache.spark.sql.Dataset.<init>(Dataset.scala:180)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:65)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:619)
        at $line18.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:37)
        at $line18.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:42)
        at $line18.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:44)
        at $line18.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:46)
        at $line18.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:48)
        at $line18.$read$$iw$$iw$$iw$$iw$$iw.<init>(<console>:50)
        at $line18.$read$$iw$$iw$$iw$$iw.<init>(<console>:52)
        at $line18.$read$$iw$$iw$$iw.<init>(<console>:54)
        at $line18.$read$$iw$$iw.<init>(<console>:56)
        at $line18.$read$$iw.<init>(<console>:58)
        at $line18.$read.<init>(<console>:60)
        at $line18.$read$.<init>(<console>:64)
        at $line18.$read$.<clinit>(<console>)
        at $line18.$eval$.$print$lzycompute(<console>:7)
        at $line18.$eval$.$print(<console>:6)
        at $line18.$eval.$print(<console>)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:786)
        at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1047)
        at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:638)
        at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:637)
        at scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31)
        at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:19)
        at scala.tools.nsc.interpreter.IMain$WrappedRequest.loadAndRunReq(IMain.scala:637)
        at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:569)
        at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:565)
        at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:807)
        at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:825)
        at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:681)
        at scala.tools.nsc.interpreter.ILoop.processLine(ILoop.scala:395)
        at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:415)
        at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:923)
        at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909)
        at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909)
        at scala.reflect.internal.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:97)
        at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:909)
        at org.apache.spark.repl.Main$.doMain(Main.scala:69)
        at org.apache.spark.repl.Main$.main(Main.scala:52)
        at org.apache.spark.repl.Main.main(Main.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:743)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:186)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:211)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
17/10/30 12:19:39 AUDIT [org.apache.carbondata.spark.rdd.CarbonDataRDDFactory$(207) -- main]: [sh-hadoop-datanode-250-104.elenet.me][master][Thread-1]Data load is failed for dm_test.dm_trd_wide_carbondata
17/10/30 12:19:39 ERROR [org.apache.carbondata.core.datastore.filesystem.AbstractDFSCarbonFile(141) -- main]: main Exception occurred:File does not exist: hdfs://bipcluster/user/master/carbon/store/dm_test/dm_trd_wide_carbondata/Fact/Part0/Segment_0
17/10/30 12:19:39 ERROR [org.apache.carbondata.core.datastore.filesystem.AbstractDFSCarbonFile(141) -- main]: main Exception occurred:File does not exist: hdfs://bipcluster/user/master/carbon/store/dm_test/dm_trd_wide_carbondata/Fact/Part0/Segment_1
17/10/30 12:19:39 ERROR [org.apache.carbondata.core.datastore.filesystem.AbstractDFSCarbonFile(141) -- main]: main Exception occurred:File does not exist: hdfs://bipcluster/user/master/carbon/store/dm_test/dm_trd_wide_carbondata/Fact/Part0/Segment_2
17/10/30 12:19:39 ERROR [org.apache.carbondata.core.datastore.filesystem.AbstractDFSCarbonFile(141) -- main]: main Exception occurred:File does not exist: hdfs://bipcluster/user/master/carbon/store/dm_test/dm_trd_wide_carbondata/Fact/Part0/Segment_3
17/10/30 12:19:39 ERROR [org.apache.spark.sql.execution.command.LoadTable(143) -- main]: main 
java.lang.NullPointerException
        at org.apache.carbondata.core.datastore.filesystem.AbstractDFSCarbonFile.isDirectory(AbstractDFSCarbonFile.java:88)
        at org.apache.carbondata.core.util.CarbonUtil.deleteRecursive(CarbonUtil.java:364)
        at org.apache.carbondata.core.util.CarbonUtil.access$100(CarbonUtil.java:93)
        at org.apache.carbondata.core.util.CarbonUtil$2.run(CarbonUtil.java:326)
        at org.apache.carbondata.core.util.CarbonUtil$2.run(CarbonUtil.java:322)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1693)
        at org.apache.carbondata.core.util.CarbonUtil.deleteFoldersAndFiles(CarbonUtil.java:322)
        at org.apache.carbondata.spark.load.CarbonLoaderUtil.recordLoadMetadata(CarbonLoaderUtil.java:331)
        at org.apache.carbondata.spark.rdd.CarbonDataRDDFactory$.updateStatus$1(CarbonDataRDDFactory.scala:595)
        at org.apache.carbondata.spark.rdd.CarbonDataRDDFactory$.loadCarbonData(CarbonDataRDDFactory.scala:1078)
        at org.apache.spark.sql.execution.command.LoadTable.processData(carbonTableSchema.scala:1046)
        at org.apache.spark.sql.execution.command.LoadTable.run(carbonTableSchema.scala:754)
        at org.apache.spark.sql.execution.command.LoadTableByInsert.processData(carbonTableSchema.scala:651)
        at org.apache.spark.sql.execution.command.LoadTableByInsert.run(carbonTableSchema.scala:637)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:67)
        at org.apache.spark.sql.Dataset.<init>(Dataset.scala:180)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:65)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:619)
        at $line18.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:37)
        at $line18.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:42)
        at $line18.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:44)
        at $line18.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:46)
        at $line18.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:48)
        at $line18.$read$$iw$$iw$$iw$$iw$$iw.<init>(<console>:50)
        at $line18.$read$$iw$$iw$$iw$$iw.<init>(<console>:52)
        at $line18.$read$$iw$$iw$$iw.<init>(<console>:54)
        at $line18.$read$$iw$$iw.<init>(<console>:56)
        at $line18.$read$$iw.<init>(<console>:58)
        at $line18.$read.<init>(<console>:60)
        at $line18.$read$.<init>(<console>:64)
        at $line18.$read$.<clinit>(<console>)
        at $line18.$eval$.$print$lzycompute(<console>:7)
        at $line18.$eval$.$print(<console>:6)
        at $line18.$eval.$print(<console>)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:786)
        at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1047)
        at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:638)
        at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:637)
        at scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31)
        at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:19)
        at scala.tools.nsc.interpreter.IMain$WrappedRequest.loadAndRunReq(IMain.scala:637)
        at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:569)
        at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:565)
        at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:807)
        at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:825)
        at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:681)
        at scala.tools.nsc.interpreter.ILoop.processLine(ILoop.scala:395)
        at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:415)
        at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:923)
        at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909)
        at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909)
        at scala.reflect.internal.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:97)
        at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:909)
        at org.apache.spark.repl.Main$.doMain(Main.scala:69)
        at org.apache.spark.repl.Main$.main(Main.scala:52)
        at org.apache.spark.repl.Main.main(Main.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:743)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:186)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:211)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
17/10/30 12:19:39 AUDIT [org.apache.spark.sql.execution.command.LoadTable(207) -- main]: [sh-hadoop-datanode-250-104.elenet.me][master][Thread-1]Dataload failure for dm_test.dm_trd_wide_carbondata. Please check the logs
java.lang.NullPointerException
  at org.apache.carbondata.core.datastore.filesystem.AbstractDFSCarbonFile.isDirectory(AbstractDFSCarbonFile.java:88)
  at org.apache.carbondata.core.util.CarbonUtil.deleteRecursive(CarbonUtil.java:364)
  at org.apache.carbondata.core.util.CarbonUtil.access$100(CarbonUtil.java:93)
  at org.apache.carbondata.core.util.CarbonUtil$2.run(CarbonUtil.java:326)
  at org.apache.carbondata.core.util.CarbonUtil$2.run(CarbonUtil.java:322)
  at java.security.AccessController.doPrivileged(Native Method)
  at javax.security.auth.Subject.doAs(Subject.java:422)
  at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1693)
  at org.apache.carbondata.core.util.CarbonUtil.deleteFoldersAndFiles(CarbonUtil.java:322)
  at org.apache.carbondata.spark.load.CarbonLoaderUtil.recordLoadMetadata(CarbonLoaderUtil.java:331)
  at org.apache.carbondata.spark.rdd.CarbonDataRDDFactory$.updateStatus$1(CarbonDataRDDFactory.scala:595)
  at org.apache.carbondata.spark.rdd.CarbonDataRDDFactory$.loadCarbonData(CarbonDataRDDFactory.scala:1078)
  at org.apache.spark.sql.execution.command.LoadTable.processData(carbonTableSchema.scala:1046)
  at org.apache.spark.sql.execution.command.LoadTable.run(carbonTableSchema.scala:754)
  at org.apache.spark.sql.execution.command.LoadTableByInsert.processData(carbonTableSchema.scala:651)
  at org.apache.spark.sql.execution.command.LoadTableByInsert.run(carbonTableSchema.scala:637)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:67)
  at org.apache.spark.sql.Dataset.<init>(Dataset.scala:180)
  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:65)
  at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:619)
  ... 51 elided


scala> carbon.sql("insert overwrite table dm_test.dm_trd_wide_carbondata select * from dm.dm_trd_wide where dt = '2017-10-10' ").collect
17/10/30 12:19:44 AUDIT [org.apache.carbondata.spark.rdd.CarbonDataRDDFactory$(207) -- main]: [sh-hadoop-datanode-250-104.elenet.me][master][Thread-1]Data load request has been received for table dm_test.dm_trd_wide_carbondata
[Stage 1:>                                                      (0 + 434) / 434]






Best regards!
Yuhai Cen


在2017年10月30日 12:17,岑玉海<ce...@163.com> 写道:
why user need to set "carbon.number.of.cores.while.loading", because the loading process is too slow. The key point is slow!
I also have a problem that I should increase the value of "spark.dynamicAllocation.maxExecutors" when the data become larger.....


Best regards!
Yuhai Cen


在2017年10月28日 16:11,Jacky Li<ja...@qq.com> 写道:

I think about this issue, and find actually there is an usability problem carbon can improve. 

The problem is that, currently user is forced to set “carbon.number.of.cores.while.loading” carbon property before loading, this creates overhead for the user and usability is not good.

To solve this, following change should be done:
1. Carbon should take the minimal value of “carbon.number.of.cores.while.loading” property and “spark.executor.cores” in spark conf, instead of taking “carbon.number.of.cores.while.loading” directly.
2. The default value of “carbon.number.of.cores.while.loading” should be big, so that carbon has higher chance to take “spark.executor.cores” as the loading cores. The current default value is 2 which I think is too small.

Regards,
Jacky


> 在 2017年10月28日,上午9:51,Jacky Li <ja...@qq.com> 写道:
> 
> Hi,
> 
> I am not getting the intention behind this proposal. Is it because of the loading failure? If yes, we should find out why the loading failed.
> If not, then what is the intention?
> 
> Actually I think the “carbon.number.of.cores.while.loading” property should be marked as obsolete.
> GLOBAL_SORT and NO_SORT should use spark default behavior
> LOCAL_SORT and BATCH_SORT should use “sparkSession.sparkContext.defaultParallism” as the cores to do local sorting
> 
> 
> Regards,
> Jacky 
> 
>> 在 2017年10月27日,上午8:43,cenyuhai11 <ce...@163.com> 写道:
>> 
>> When I insert data into carbondata from one table, I should do as the
>> following:
>> 1、select count(1) from table1
>> and then
>> 2、insert into table table1 select * from table1
>> 
>> Why I should execute "select count(1) from table1" first?
>> because the number of tasks are compute by carbondata, it is releated to how
>> many executor hosts we have now!
>> 
>> I don't think it is the right way. We should let spark to control the number
>> of tasks.
>> set the parameter "mapred.max.splits.size" is a common way to adjust the
>> number of tasks.
>> 
>> Even when I do the step 2, some tasks still failed, it will increase the
>> insert time.
>> 
>> So I sugguest that don't adjust the number of tasks, just use the default
>> behavior of spark. 
>> And then if there are small files, add a fast merge job(merge data at
>> blocket level, just as )
>> 
>> so we also need to set the default value of
>> "carbon.number.of.cores.while.loading" to 1
>> 
>> 
>> 
>> 
>> 
>> 
>> --
>> Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/
> 


回复: [DISCUSSION] Improve insert process

Posted by 岑玉海 <ce...@163.com>.
this still can not resolve my problem. The number of tasks are not stable. sometimes only 10 tasks... Can 10 tasks work? I think maybe it can, but it will take a long time...




Best regards!
Yuhai Cen


在2017年10月30日 13:21,Jacky Li<ja...@qq.com> 写道:




在 2017年10月30日,上午9:47,岑玉海 <ce...@163.com> 写道:


why user need to set "carbon.number.of.cores.while.loading", because the loading process is too slow. The key point is slow!


Yes I agree. What I am suggesting in the other mail is that carbon should take care of it if the user does not set “carbon.number.of.cores.while.loading”. 
1. Carbon should take the minimal value of “carbon.number.of.cores.while.loading” property and “spark.executor.cores” in spark conf, instead of taking “carbon.number.of.cores.while.loading” directly.
2. The default value of “carbon.number.of.cores.while.loading” should be big, so that carbon has higher chance to take “spark.executor.cores” as the loading cores. The current default value is 2 which I think is too small.


I think it can solve your problem after this change. 


I also have a problem that I should increase the value of "spark.dynamicAllocation.maxExecutors" when the data become larger.....


Best regards!
Yuhai Cen


在2017年10月28日 16:11,Jacky Li<ja...@qq.com> 写道: 

I think about this issue, and find actually there is an usability problem carbon can improve.  

The problem is that, currently user is forced to set “carbon.number.of.cores.while.loading” carbon property before loading, this creates overhead for the user and usability is not good. 

To solve this, following change should be done: 
1. Carbon should take the minimal value of “carbon.number.of.cores.while.loading” property and “spark.executor.cores” in spark conf, instead of taking “carbon.number.of.cores.while.loading” directly. 
2. The default value of “carbon.number.of.cores.while.loading” should be big, so that carbon has higher chance to take “spark.executor.cores” as the loading cores. The current default value is 2 which I think is too small. 

Regards, 
Jacky 


> 在 2017年10月28日,上午9:51,Jacky Li <ja...@qq.com> 写道: 
>  
> Hi, 
>  
> I am not getting the intention behind this proposal. Is it because of the loading failure? If yes, we should find out why the loading failed. 
> If not, then what is the intention? 
>  
> Actually I think the “carbon.number.of.cores.while.loading” property should be marked as obsolete. 
> GLOBAL_SORT and NO_SORT should use spark default behavior 
> LOCAL_SORT and BATCH_SORT should use “sparkSession.sparkContext.defaultParallism” as the cores to do local sorting 
>  
>  
> Regards, 
> Jacky  
>  
>> 在 2017年10月27日,上午8:43,cenyuhai11 <ce...@163.com> 写道: 
>>  
>> When I insert data into carbondata from one table, I should do as the 
>> following: 
>> 1、select count(1) from table1 
>> and then 
>> 2、insert into table table1 select * from table1 
>>  
>> Why I should execute "select count(1) from table1" first? 
>> because the number of tasks are compute by carbondata, it is releated to how 
>> many executor hosts we have now! 
>>  
>> I don't think it is the right way. We should let spark to control the number 
>> of tasks. 
>> set the parameter "mapred.max.splits.size" is a common way to adjust the 
>> number of tasks. 
>>  
>> Even when I do the step 2, some tasks still failed, it will increase the 
>> insert time. 
>>  
>> So I sugguest that don't adjust the number of tasks, just use the default 
>> behavior of spark.  
>> And then if there are small files, add a fast merge job(merge data at 
>> blocket level, just as ) 
>>  
>> so we also need to set the default value of 
>> "carbon.number.of.cores.while.loading" to 1 
>>  
>>  
>>  
>>  
>>  
>>  
>> -- 
>> Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/ 
>  




Re: [DISCUSSION] Improve insert process

Posted by Jacky Li <ja...@qq.com>.

> 在 2017年10月30日,上午9:47,岑玉海 <ce...@163.com> 写道:
> 
> why user need to set "carbon.number.of.cores.while.loading", because the loading process is too slow. The key point is slow!

Yes I agree. What I am suggesting in the other mail is that carbon should take care of it if the user does not set “carbon.number.of.cores.while.loading”. 
1. Carbon should take the minimal value of “carbon.number.of.cores.while.loading” property and “spark.executor.cores” in spark conf, instead of taking “carbon.number.of.cores.while.loading” directly.
2. The default value of “carbon.number.of.cores.while.loading” should be big, so that carbon has higher chance to take “spark.executor.cores” as the loading cores. The current default value is 2 which I think is too small.

I think it can solve your problem after this change. 

> I also have a problem that I should increase the value of "spark.dynamicAllocation.maxExecutors" when the data become larger.....
> 
> Best regards!
> Yuhai Cen
> 
> 在2017年10月28日 16:11,Jacky Li<ja...@qq.com> <ma...@qq.com> 写道: 
> 
> I think about this issue, and find actually there is an usability problem carbon can improve.  
> 
> The problem is that, currently user is forced to set “carbon.number.of.cores.while.loading” carbon property before loading, this creates overhead for the user and usability is not good. 
> 
> To solve this, following change should be done: 
> 1. Carbon should take the minimal value of “carbon.number.of.cores.while.loading” property and “spark.executor.cores” in spark conf, instead of taking “carbon.number.of.cores.while.loading” directly. 
> 2. The default value of “carbon.number.of.cores.while.loading” should be big, so that carbon has higher chance to take “spark.executor.cores” as the loading cores. The current default value is 2 which I think is too small. 
> 
> Regards, 
> Jacky 
> 
> 
> > 在 2017年10月28日,上午9:51,Jacky Li <ja...@qq.com> 写道: 
> >  
> > Hi, 
> >  
> > I am not getting the intention behind this proposal. Is it because of the loading failure? If yes, we should find out why the loading failed. 
> > If not, then what is the intention? 
> >  
> > Actually I think the “carbon.number.of.cores.while.loading” property should be marked as obsolete. 
> > GLOBAL_SORT and NO_SORT should use spark default behavior 
> > LOCAL_SORT and BATCH_SORT should use “sparkSession.sparkContext.defaultParallism” as the cores to do local sorting 
> >  
> >  
> > Regards, 
> > Jacky  
> >  
> >> 在 2017年10月27日,上午8:43,cenyuhai11 <ce...@163.com> 写道: 
> >>  
> >> When I insert data into carbondata from one table, I should do as the 
> >> following: 
> >> 1、select count(1) from table1 
> >> and then 
> >> 2、insert into table table1 select * from table1 
> >>  
> >> Why I should execute "select count(1) from table1" first? 
> >> because the number of tasks are compute by carbondata, it is releated to how 
> >> many executor hosts we have now! 
> >>  
> >> I don't think it is the right way. We should let spark to control the number 
> >> of tasks. 
> >> set the parameter "mapred.max.splits.size" is a common way to adjust the 
> >> number of tasks. 
> >>  
> >> Even when I do the step 2, some tasks still failed, it will increase the 
> >> insert time. 
> >>  
> >> So I sugguest that don't adjust the number of tasks, just use the default 
> >> behavior of spark.  
> >> And then if there are small files, add a fast merge job(merge data at 
> >> blocket level, just as ) 
> >>  
> >> so we also need to set the default value of 
> >> "carbon.number.of.cores.while.loading" to 1 
> >>  
> >>  
> >>  
> >>  
> >>  
> >>  
> >> -- 
> >> Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/ 
> >  
> 


回复: [DISCUSSION] Improve insert process

Posted by 岑玉海 <ce...@163.com>.
why user need to set "carbon.number.of.cores.while.loading", because the loading process is too slow. The key point is slow!
I also have a problem that I should increase the value of "spark.dynamicAllocation.maxExecutors" when the data become larger.....


Best regards!
Yuhai Cen


在2017年10月28日 16:11,Jacky Li<ja...@qq.com> 写道:

I think about this issue, and find actually there is an usability problem carbon can improve. 

The problem is that, currently user is forced to set “carbon.number.of.cores.while.loading” carbon property before loading, this creates overhead for the user and usability is not good.

To solve this, following change should be done:
1. Carbon should take the minimal value of “carbon.number.of.cores.while.loading” property and “spark.executor.cores” in spark conf, instead of taking “carbon.number.of.cores.while.loading” directly.
2. The default value of “carbon.number.of.cores.while.loading” should be big, so that carbon has higher chance to take “spark.executor.cores” as the loading cores. The current default value is 2 which I think is too small.

Regards,
Jacky


> 在 2017年10月28日,上午9:51,Jacky Li <ja...@qq.com> 写道:
> 
> Hi,
> 
> I am not getting the intention behind this proposal. Is it because of the loading failure? If yes, we should find out why the loading failed.
> If not, then what is the intention?
> 
> Actually I think the “carbon.number.of.cores.while.loading” property should be marked as obsolete.
> GLOBAL_SORT and NO_SORT should use spark default behavior
> LOCAL_SORT and BATCH_SORT should use “sparkSession.sparkContext.defaultParallism” as the cores to do local sorting
> 
> 
> Regards,
> Jacky 
> 
>> 在 2017年10月27日,上午8:43,cenyuhai11 <ce...@163.com> 写道:
>> 
>> When I insert data into carbondata from one table, I should do as the
>> following:
>> 1、select count(1) from table1
>> and then
>> 2、insert into table table1 select * from table1
>> 
>> Why I should execute "select count(1) from table1" first?
>> because the number of tasks are compute by carbondata, it is releated to how
>> many executor hosts we have now!
>> 
>> I don't think it is the right way. We should let spark to control the number
>> of tasks.
>> set the parameter "mapred.max.splits.size" is a common way to adjust the
>> number of tasks.
>> 
>> Even when I do the step 2, some tasks still failed, it will increase the
>> insert time.
>> 
>> So I sugguest that don't adjust the number of tasks, just use the default
>> behavior of spark. 
>> And then if there are small files, add a fast merge job(merge data at
>> blocket level, just as )
>> 
>> so we also need to set the default value of
>> "carbon.number.of.cores.while.loading" to 1
>> 
>> 
>> 
>> 
>> 
>> 
>> --
>> Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/
> 


Re: [DISCUSSION] Improve insert process

Posted by Jacky Li <ja...@qq.com>.
I think about this issue, and find actually there is an usability problem carbon can improve. 

The problem is that, currently user is forced to set “carbon.number.of.cores.while.loading” carbon property before loading, this creates overhead for the user and usability is not good.

To solve this, following change should be done:
1. Carbon should take the minimal value of “carbon.number.of.cores.while.loading” property and “spark.executor.cores” in spark conf, instead of taking “carbon.number.of.cores.while.loading” directly.
2. The default value of “carbon.number.of.cores.while.loading” should be big, so that carbon has higher chance to take “spark.executor.cores” as the loading cores. The current default value is 2 which I think is too small.

Regards,
Jacky


> 在 2017年10月28日,上午9:51,Jacky Li <ja...@qq.com> 写道:
> 
> Hi,
> 
> I am not getting the intention behind this proposal. Is it because of the loading failure? If yes, we should find out why the loading failed.
> If not, then what is the intention?
> 
> Actually I think the “carbon.number.of.cores.while.loading” property should be marked as obsolete.
> GLOBAL_SORT and NO_SORT should use spark default behavior
> LOCAL_SORT and BATCH_SORT should use “sparkSession.sparkContext.defaultParallism” as the cores to do local sorting
> 
> 
> Regards,
> Jacky 
> 
>> 在 2017年10月27日,上午8:43,cenyuhai11 <ce...@163.com> 写道:
>> 
>> When I insert data into carbondata from one table, I should do as the
>> following:
>> 1、select count(1) from table1
>> and then
>> 2、insert into table table1 select * from table1
>> 
>> Why I should execute "select count(1) from table1" first?
>> because the number of tasks are compute by carbondata, it is releated to how
>> many executor hosts we have now!
>> 
>> I don't think it is the right way. We should let spark to control the number
>> of tasks.
>> set the parameter "mapred.max.splits.size" is a common way to adjust the
>> number of tasks.
>> 
>> Even when I do the step 2, some tasks still failed, it will increase the
>> insert time.
>> 
>> So I sugguest that don't adjust the number of tasks, just use the default
>> behavior of spark. 
>> And then if there are small files, add a fast merge job(merge data at
>> blocket level, just as )
>> 
>> so we also need to set the default value of
>> "carbon.number.of.cores.while.loading" to 1
>> 
>> 
>> 
>> 
>> 
>> 
>> --
>> Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/
> 


回复: [DISCUSSION] Improve insert process

Posted by 岑玉海 <ce...@163.com>.
Hi, Jacky
    Because it can't take full advantage of my resources and failed randomly.  I want to make the insert process more quickly and stable.
    Yes, you are right. we should also find out why the loading failed. 
    I don't think LOCAL_SORT and BATCH_SORT should use “sparkSession.sparkContext.defaultParallism”, use spark default behavior is more resonable, because it is based on the datasize. Users don't need to adjust it when the data become larger.




Best regards!
Yuhai Cen


在2017年10月28日 12:21,Jacky Li<ja...@qq.com> 写道:
Hi,

I am not getting the intention behind this proposal. Is it because of the loading failure? If yes, we should find out why the loading failed.
If not, then what is the intention?

Actually I think the “carbon.number.of.cores.while.loading” property should be marked as obsolete.
GLOBAL_SORT and NO_SORT should use spark default behavior
LOCAL_SORT and BATCH_SORT should use “sparkSession.sparkContext.defaultParallism” as the cores to do local sorting


Regards,
Jacky 

> 在 2017年10月27日,上午8:43,cenyuhai11 <ce...@163.com> 写道:
> 
> When I insert data into carbondata from one table, I should do as the
> following:
> 1、select count(1) from table1
> and then
> 2、insert into table table1 select * from table1
> 
> Why I should execute "select count(1) from table1" first?
> because the number of tasks are compute by carbondata, it is releated to how
> many executor hosts we have now!
> 
> I don't think it is the right way. We should let spark to control the number
> of tasks.
> set the parameter "mapred.max.splits.size" is a common way to adjust the
> number of tasks.
> 
> Even when I do the step 2, some tasks still failed, it will increase the
> insert time.
> 
> So I sugguest that don't adjust the number of tasks, just use the default
> behavior of spark. 
> And then if there are small files, add a fast merge job(merge data at
> blocket level, just as )
> 
> so we also need to set the default value of
> "carbon.number.of.cores.while.loading" to 1
> 
> 
> 
> 
> 
> 
> --
> Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/




Re: [DISCUSSION] Improve insert process

Posted by Jacky Li <ja...@qq.com>.
Hi,

I am not getting the intention behind this proposal. Is it because of the loading failure? If yes, we should find out why the loading failed.
If not, then what is the intention?

Actually I think the “carbon.number.of.cores.while.loading” property should be marked as obsolete.
GLOBAL_SORT and NO_SORT should use spark default behavior
LOCAL_SORT and BATCH_SORT should use “sparkSession.sparkContext.defaultParallism” as the cores to do local sorting


Regards,
Jacky 

> 在 2017年10月27日,上午8:43,cenyuhai11 <ce...@163.com> 写道:
> 
> When I insert data into carbondata from one table, I should do as the
> following:
> 1、select count(1) from table1
> and then
> 2、insert into table table1 select * from table1
> 
> Why I should execute "select count(1) from table1" first?
> because the number of tasks are compute by carbondata, it is releated to how
> many executor hosts we have now!
> 
> I don't think it is the right way. We should let spark to control the number
> of tasks.
> set the parameter "mapred.max.splits.size" is a common way to adjust the
> number of tasks.
> 
> Even when I do the step 2, some tasks still failed, it will increase the
> insert time.
> 
> So I sugguest that don't adjust the number of tasks, just use the default
> behavior of spark. 
> And then if there are small files, add a fast merge job(merge data at
> blocket level, just as )
> 
> so we also need to set the default value of
> "carbon.number.of.cores.while.loading" to 1
> 
> 
> 
> 
> 
> 
> --
> Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/