You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@griffin.apache.org by Dhiren Sangani <dh...@enquero.com> on 2018/12/03 07:33:38 UTC

Accuracy measure fails on large dataset

Hi Dev Team,

I am facing issue with Accuracy measure while running job on larger dataset (1B records).
I have source and target tables in Hive and using simple accuracy rule as below:

Source.Column1 = target.column1 AND source.column2 = target.column2

Getting below exception while running the job.

18/11/30 21:44:12 ERROR MetricWriteStep: get metric accuracy fails
java.util.concurrent.TimeoutException: Futures timed out after [3600 seconds]
        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
        at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
        at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:201)
        at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:136)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:144)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:140)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:140)
        at org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec.doExecute(BroadcastNestedLoopJoinExec.scala:343)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
        at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
        at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
       at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
        at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
        at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.DeserializeToObjectExec.doExecute(objects.scala:89)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.MapPartitionsExec.doExecute(objects.scala:185)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
        at org.apache.spark.sql.execution.SerializeFromObjectExec.inputRDDs(objects.scala:110)
        at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
        at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:294)
        at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3278)
        at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2727)
        at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2727)
        at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3259)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3258)
        at org.apache.spark.sql.Dataset.collect(Dataset.scala:2727)
        at org.apache.griffin.measure.step.write.MetricWriteStep.getMetricMaps(MetricWriteStep.scala:86)
        at org.apache.griffin.measure.step.write.MetricWriteStep.execute(MetricWriteStep.scala:46)
        at org.apache.griffin.measure.step.SeqDQStep$$anonfun$execute$1.apply(SeqDQStep.scala:37)
        at org.apache.griffin.measure.step.SeqDQStep$$anonfun$execute$1.apply(SeqDQStep.scala:36)
        at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
        at scala.collection.immutable.List.foldLeft(List.scala:84)
        at org.apache.griffin.measure.step.SeqDQStep.execute(SeqDQStep.scala:36)
        at org.apache.griffin.measure.job.DQJob$$anonfun$execute$1.apply(DQJob.scala:31)
        at org.apache.griffin.measure.job.DQJob$$anonfun$execute$1.apply(DQJob.scala:30)
        at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
        at scala.collection.immutable.List.foldLeft(List.scala:84)
        at org.apache.griffin.measure.job.DQJob.execute(DQJob.scala:30)
        at org.apache.griffin.measure.launch.batch.BatchDQApp$$anonfun$run$1.apply$mcZ$sp(BatchDQApp.scala:103)
        at org.apache.griffin.measure.launch.batch.BatchDQApp$$anonfun$run$1.apply(BatchDQApp.scala:67)
        at org.apache.griffin.measure.launch.batch.BatchDQApp$$anonfun$run$1.apply(BatchDQApp.scala:67)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.griffin.measure.launch.batch.BatchDQApp.run(BatchDQApp.scala:67)
        at org.apache.griffin.measure.Application$.main(Application.scala:88)
        at org.apache.griffin.measure.Application.main(Application.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$4.run(ApplicationMaster.scala:721)
18/11/30 21:44:12 INFO MetricWriteStep: metricMaps => List()

I tried to run Job with half of the data sets (500M) and it run successfully.
Below is the log of the metric.
18/11/30 20:37:31 INFO MetricWriteStep: metricMaps => WrappedArray(Map(total -> 508880897, miss -> 0, matched -> 508880897, matchedFraction -> 1.0))

But if I try to run it even with 700M records, it fails.
18/11/30 21:44:12 ERROR MetricWriteStep: get metric accuracy fails
18/11/30 21:44:12 INFO MetricWriteStep: metricMaps => List()

Is it something related to Join query being used inside AccuracyExpr2DQSteps.scala to calculate miss records?

Any pointers will be appreciated.

Thanks,
Dhiren


RE: Accuracy measure fails on large dataset

Posted by Dhiren Sangani <dh...@enquero.com>.
Hi Lionel/Nick,

Exception was getting thrown at the time of reading table from the cache so I tried to run it without cache.
Job got succeeded without any exceptions.
Not sure why it fails if I use cache.

I will try with property "spark.broadcast.blockSize" along with cache to see if that works.

Earlier I tried disabling broadcast with below property in spak-default.xml, but still it was using broadcast in join.
spark.sql.autoBroadcastJoinThreshold -1

Thanks,
Dhiren

From: Lionel Liu <li...@apache.org>
Sent: Tuesday, December 4, 2018 5:57 PM
To: Dhiren Sangani <dh...@enquero.com>
Cc: dev@griffin.apache.org; dev@griffin.incubator.apache.org; chemikadze@gmail.com
Subject: Re: Accuracy measure fails on large dataset

Hi Dhiren,

Based on the advisement from Nick, you can try to adjust the property "spark.broadcast.blockSize" in spark.config field in env.json.

Thanks,
Lionel

On Wed, Dec 5, 2018 at 4:07 AM Nick Sokolov <ch...@gmail.com>> wrote:
Error looks like Spark is trying to do broadcast join, but data size for broadcast is too large. It makes sense to try adjusting spark properties in Griffin to disable auto broadcast join or adjusting broadcast join threshold.
On Mon, Dec 3, 2018, 9:20 AM Dhiren Sangani <dh...@enquero.com>> wrote:
Hi Lionel,

Thanks for the reply.

Below are the parameters being used while submitting the job.

"numExecutors": 10,
"executorCores": 6,
"driverMemory": "10g",
"executorMemory": "7g"

I have 2 Data nodes configured with Yarn each with 80G memory. Each server has 128G physical RAM.
I don’t have hourly level partition in my data. It’s day level only.

I tried removing extra columns from both the table (source, target) using spark-sql rules and after that job got succeeded.
There are 23 columns in table and both source and target has same number of columns.

But If I use data with all the columns, it’s failing.

Thanks,
Dhiren

From: Lionel Liu <li...@apache.org>>
Sent: Monday, December 3, 2018 1:35 AM
To: dev@griffin.apache.org<ma...@griffin.apache.org>; Dhiren Sangani <dh...@enquero.com>>
Cc: dev@griffin.incubator.apache.org<ma...@griffin.incubator.apache.org>
Subject: Re: Accuracy measure fails on large dataset

Hi Dhiren,

How many resources are your using when submit this job?
For large scale data, the simple solution is to use more resources for the calculation job.
For limited resources, a common solution is to partition your data by date or hour, to make it smaller in each partition, then you can calculate the partial data each time.

Thanks,
Lionel




On Mon, Dec 3, 2018 at 3:33 PM Dhiren Sangani <dh...@enquero.com>>> wrote:
Hi Dev Team,

I am facing issue with Accuracy measure while running job on larger dataset (1B records).
I have source and target tables in Hive and using simple accuracy rule as below:

Source.Column1 = target.column1 AND source.column2 = target.column2

Getting below exception while running the job.

18/11/30 21:44:12 ERROR MetricWriteStep: get metric accuracy fails
java.util.concurrent.TimeoutException: Futures timed out after [3600 seconds]
        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
        at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
        at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:201)
        at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:136)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:144)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:140)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:140)
        at org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec.doExecute(BroadcastNestedLoopJoinExec.scala:343)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
        at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
        at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
       at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
        at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
        at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.DeserializeToObjectExec.doExecute(objects.scala:89)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.MapPartitionsExec.doExecute(objects.scala:185)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
        at org.apache.spark.sql.execution.SerializeFromObjectExec.inputRDDs(objects.scala:110)
        at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
        at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:294)
        at org.apache.spark.sql.Dataset.org<http://org.apache.spark.sql.Dataset.org><http://org.apache.spark.sql.Dataset.org>$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3278)
        at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2727)
        at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2727)
        at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3259)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3258)
        at org.apache.spark.sql.Dataset.collect(Dataset.scala:2727)
        at org.apache.griffin.measure.step.write.MetricWriteStep.getMetricMaps(MetricWriteStep.scala:86)
        at org.apache.griffin.measure.step.write.MetricWriteStep.execute(MetricWriteStep.scala:46)
        at org.apache.griffin.measure.step.SeqDQStep$$anonfun$execute$1.apply(SeqDQStep.scala:37)
        at org.apache.griffin.measure.step.SeqDQStep$$anonfun$execute$1.apply(SeqDQStep.scala:36)
        at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
        at scala.collection.immutable.List.foldLeft(List.scala:84)
        at org.apache.griffin.measure.step.SeqDQStep.execute(SeqDQStep.scala:36)
        at org.apache.griffin.measure.job.DQJob$$anonfun$execute$1.apply(DQJob.scala:31)
        at org.apache.griffin.measure.job.DQJob$$anonfun$execute$1.apply(DQJob.scala:30)
        at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
        at scala.collection.immutable.List.foldLeft(List.scala:84)
        at org.apache.griffin.measure.job.DQJob.execute(DQJob.scala:30)
        at org.apache.griffin.measure.launch.batch.BatchDQApp$$anonfun$run$1.apply$mcZ$sp(BatchDQApp.scala:103)
        at org.apache.griffin.measure.launch.batch.BatchDQApp$$anonfun$run$1.apply(BatchDQApp.scala:67)
        at org.apache.griffin.measure.launch.batch.BatchDQApp$$anonfun$run$1.apply(BatchDQApp.scala:67)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.griffin.measure.launch.batch.BatchDQApp.run(BatchDQApp.scala:67)
        at org.apache.griffin.measure.Application$.main(Application.scala:88)
        at org.apache.griffin.measure.Application.main(Application.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$4.run(ApplicationMaster.scala:721)
18/11/30 21:44:12 INFO MetricWriteStep: metricMaps => List()

I tried to run Job with half of the data sets (500M) and it run successfully.
Below is the log of the metric.
18/11/30 20:37:31 INFO MetricWriteStep: metricMaps => WrappedArray(Map(total -> 508880897, miss -> 0, matched -> 508880897, matchedFraction -> 1.0))

But if I try to run it even with 700M records, it fails.
18/11/30 21:44:12 ERROR MetricWriteStep: get metric accuracy fails
18/11/30 21:44:12 INFO MetricWriteStep: metricMaps => List()

Is it something related to Join query being used inside AccuracyExpr2DQSteps.scala to calculate miss records?

Any pointers will be appreciated.

Thanks,
Dhiren

Re: Accuracy measure fails on large dataset

Posted by Lionel Liu <li...@apache.org>.
Hi Dhiren,

Based on the advisement from Nick, you can try to adjust the property "
spark.broadcast.blockSize" in spark.config field in env.json.

Thanks,
Lionel

On Wed, Dec 5, 2018 at 4:07 AM Nick Sokolov <ch...@gmail.com> wrote:

> Error looks like Spark is trying to do broadcast join, but data size for
> broadcast is too large. It makes sense to try adjusting spark properties in
> Griffin to disable auto broadcast join or adjusting broadcast join
> threshold.
>
> On Mon, Dec 3, 2018, 9:20 AM Dhiren Sangani <dh...@enquero.com>
> wrote:
>
>> Hi Lionel,
>>
>> Thanks for the reply.
>>
>> Below are the parameters being used while submitting the job.
>>
>> "numExecutors": 10,
>> "executorCores": 6,
>> "driverMemory": "10g",
>> "executorMemory": "7g"
>>
>> I have 2 Data nodes configured with Yarn each with 80G memory. Each
>> server has 128G physical RAM.
>> I don’t have hourly level partition in my data. It’s day level only.
>>
>> I tried removing extra columns from both the table (source, target) using
>> spark-sql rules and after that job got succeeded.
>> There are 23 columns in table and both source and target has same number
>> of columns.
>>
>> But If I use data with all the columns, it’s failing.
>>
>> Thanks,
>> Dhiren
>>
>> From: Lionel Liu <li...@apache.org>
>> Sent: Monday, December 3, 2018 1:35 AM
>> To: dev@griffin.apache.org; Dhiren Sangani <dh...@enquero.com>
>> Cc: dev@griffin.incubator.apache.org
>> Subject: Re: Accuracy measure fails on large dataset
>>
>> Hi Dhiren,
>>
>> How many resources are your using when submit this job?
>> For large scale data, the simple solution is to use more resources for
>> the calculation job.
>> For limited resources, a common solution is to partition your data by
>> date or hour, to make it smaller in each partition, then you can calculate
>> the partial data each time.
>>
>> Thanks,
>> Lionel
>>
>>
>>
>>
>> On Mon, Dec 3, 2018 at 3:33 PM Dhiren Sangani <dhiren.sangani@enquero.com
>> <ma...@enquero.com>> wrote:
>> Hi Dev Team,
>>
>> I am facing issue with Accuracy measure while running job on larger
>> dataset (1B records).
>> I have source and target tables in Hive and using simple accuracy rule as
>> below:
>>
>> Source.Column1 = target.column1 AND source.column2 = target.column2
>>
>> Getting below exception while running the job.
>>
>> 18/11/30 21:44:12 ERROR MetricWriteStep: get metric accuracy fails
>> java.util.concurrent.TimeoutException: Futures timed out after [3600
>> seconds]
>>         at
>> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>>         at
>> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>>         at
>> org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:201)
>>         at
>> org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:136)
>>         at
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:144)
>>         at
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:140)
>>         at
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
>>         at
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>         at
>> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>         at
>> org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:140)
>>         at
>> org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec.doExecute(BroadcastNestedLoopJoinExec.scala:343)
>>         at
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>         at
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>         at
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
>>         at
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>         at
>> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>         at
>> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>>         at
>> org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
>>         at
>> org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
>>         at
>> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
>>         at
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>         at
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>         at
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
>>        at
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>         at
>> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>         at
>> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>>         at
>> org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
>>         at
>> org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
>>         at
>> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
>>         at
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>         at
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>         at
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
>>         at
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>         at
>> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>         at
>> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>>         at
>> org.apache.spark.sql.execution.DeserializeToObjectExec.doExecute(objects.scala:89)
>>         at
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>         at
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>         at
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
>>         at
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>         at
>> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>         at
>> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>>         at
>> org.apache.spark.sql.execution.MapPartitionsExec.doExecute(objects.scala:185)
>>         at
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>         at
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>         at
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
>>         at
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>         at
>> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>         at
>> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>>         at
>> org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
>>         at
>> org.apache.spark.sql.execution.SerializeFromObjectExec.inputRDDs(objects.scala:110)
>>         at
>> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
>>         at
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>         at
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>         at
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
>>         at
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>         at
>> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>         at
>> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>>         at
>> org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
>>         at
>> org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:294)
>>         at org.apache.spark.sql.Dataset.org<
>> http://org.apache.spark.sql.Dataset.org
>> >$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3278)
>>         at
>> org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2727)
>>         at
>> org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2727)
>>         at
>> org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3259)
>>         at
>> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
>>         at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3258)
>>         at org.apache.spark.sql.Dataset.collect(Dataset.scala:2727)
>>         at
>> org.apache.griffin.measure.step.write.MetricWriteStep.getMetricMaps(MetricWriteStep.scala:86)
>>         at
>> org.apache.griffin.measure.step.write.MetricWriteStep.execute(MetricWriteStep.scala:46)
>>         at
>> org.apache.griffin.measure.step.SeqDQStep$$anonfun$execute$1.apply(SeqDQStep.scala:37)
>>         at
>> org.apache.griffin.measure.step.SeqDQStep$$anonfun$execute$1.apply(SeqDQStep.scala:36)
>>         at
>> scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
>>         at scala.collection.immutable.List.foldLeft(List.scala:84)
>>         at
>> org.apache.griffin.measure.step.SeqDQStep.execute(SeqDQStep.scala:36)
>>         at
>> org.apache.griffin.measure.job.DQJob$$anonfun$execute$1.apply(DQJob.scala:31)
>>         at
>> org.apache.griffin.measure.job.DQJob$$anonfun$execute$1.apply(DQJob.scala:30)
>>         at
>> scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
>>         at scala.collection.immutable.List.foldLeft(List.scala:84)
>>         at org.apache.griffin.measure.job.DQJob.execute(DQJob.scala:30)
>>         at
>> org.apache.griffin.measure.launch.batch.BatchDQApp$$anonfun$run$1.apply$mcZ$sp(BatchDQApp.scala:103)
>>         at
>> org.apache.griffin.measure.launch.batch.BatchDQApp$$anonfun$run$1.apply(BatchDQApp.scala:67)
>>         at
>> org.apache.griffin.measure.launch.batch.BatchDQApp$$anonfun$run$1.apply(BatchDQApp.scala:67)
>>         at scala.util.Try$.apply(Try.scala:192)
>>         at
>> org.apache.griffin.measure.launch.batch.BatchDQApp.run(BatchDQApp.scala:67)
>>         at
>> org.apache.griffin.measure.Application$.main(Application.scala:88)
>>         at org.apache.griffin.measure.Application.main(Application.scala)
>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>         at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>         at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>         at java.lang.reflect.Method.invoke(Method.java:498)
>>         at
>> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$4.run(ApplicationMaster.scala:721)
>> 18/11/30 21:44:12 INFO MetricWriteStep: metricMaps => List()
>>
>> I tried to run Job with half of the data sets (500M) and it run
>> successfully.
>> Below is the log of the metric.
>> 18/11/30 20:37:31 INFO MetricWriteStep: metricMaps =>
>> WrappedArray(Map(total -> 508880897, miss -> 0, matched -> 508880897,
>> matchedFraction -> 1.0))
>>
>> But if I try to run it even with 700M records, it fails.
>> 18/11/30 21:44:12 ERROR MetricWriteStep: get metric accuracy fails
>> 18/11/30 21:44:12 INFO MetricWriteStep: metricMaps => List()
>>
>> Is it something related to Join query being used inside
>> AccuracyExpr2DQSteps.scala to calculate miss records?
>>
>> Any pointers will be appreciated.
>>
>> Thanks,
>> Dhiren
>>
>

Re: Accuracy measure fails on large dataset

Posted by Nick Sokolov <ch...@gmail.com>.
Error looks like Spark is trying to do broadcast join, but data size for
broadcast is too large. It makes sense to try adjusting spark properties in
Griffin to disable auto broadcast join or adjusting broadcast join
threshold.

On Mon, Dec 3, 2018, 9:20 AM Dhiren Sangani <dh...@enquero.com>
wrote:

> Hi Lionel,
>
> Thanks for the reply.
>
> Below are the parameters being used while submitting the job.
>
> "numExecutors": 10,
> "executorCores": 6,
> "driverMemory": "10g",
> "executorMemory": "7g"
>
> I have 2 Data nodes configured with Yarn each with 80G memory. Each server
> has 128G physical RAM.
> I don’t have hourly level partition in my data. It’s day level only.
>
> I tried removing extra columns from both the table (source, target) using
> spark-sql rules and after that job got succeeded.
> There are 23 columns in table and both source and target has same number
> of columns.
>
> But If I use data with all the columns, it’s failing.
>
> Thanks,
> Dhiren
>
> From: Lionel Liu <li...@apache.org>
> Sent: Monday, December 3, 2018 1:35 AM
> To: dev@griffin.apache.org; Dhiren Sangani <dh...@enquero.com>
> Cc: dev@griffin.incubator.apache.org
> Subject: Re: Accuracy measure fails on large dataset
>
> Hi Dhiren,
>
> How many resources are your using when submit this job?
> For large scale data, the simple solution is to use more resources for the
> calculation job.
> For limited resources, a common solution is to partition your data by date
> or hour, to make it smaller in each partition, then you can calculate the
> partial data each time.
>
> Thanks,
> Lionel
>
>
>
>
> On Mon, Dec 3, 2018 at 3:33 PM Dhiren Sangani <dhiren.sangani@enquero.com
> <ma...@enquero.com>> wrote:
> Hi Dev Team,
>
> I am facing issue with Accuracy measure while running job on larger
> dataset (1B records).
> I have source and target tables in Hive and using simple accuracy rule as
> below:
>
> Source.Column1 = target.column1 AND source.column2 = target.column2
>
> Getting below exception while running the job.
>
> 18/11/30 21:44:12 ERROR MetricWriteStep: get metric accuracy fails
> java.util.concurrent.TimeoutException: Futures timed out after [3600
> seconds]
>         at
> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>         at
> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>         at
> org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:201)
>         at
> org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:136)
>         at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:144)
>         at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:140)
>         at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
>         at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>         at
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>         at
> org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:140)
>         at
> org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec.doExecute(BroadcastNestedLoopJoinExec.scala:343)
>         at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>         at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>         at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
>         at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>         at
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>         at
> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>         at
> org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
>         at
> org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
>         at
> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
>         at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>         at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>         at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
>        at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>         at
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>         at
> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>         at
> org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
>         at
> org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
>         at
> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
>         at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>         at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>         at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
>         at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>         at
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>         at
> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>         at
> org.apache.spark.sql.execution.DeserializeToObjectExec.doExecute(objects.scala:89)
>         at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>         at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>         at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
>         at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>         at
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>         at
> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>         at
> org.apache.spark.sql.execution.MapPartitionsExec.doExecute(objects.scala:185)
>         at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>         at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>         at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
>         at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>         at
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>         at
> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>         at
> org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
>         at
> org.apache.spark.sql.execution.SerializeFromObjectExec.inputRDDs(objects.scala:110)
>         at
> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
>         at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>         at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>         at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
>         at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>         at
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>         at
> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>         at
> org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
>         at
> org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:294)
>         at org.apache.spark.sql.Dataset.org<
> http://org.apache.spark.sql.Dataset.org
> >$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3278)
>         at
> org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2727)
>         at
> org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2727)
>         at
> org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3259)
>         at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
>         at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3258)
>         at org.apache.spark.sql.Dataset.collect(Dataset.scala:2727)
>         at
> org.apache.griffin.measure.step.write.MetricWriteStep.getMetricMaps(MetricWriteStep.scala:86)
>         at
> org.apache.griffin.measure.step.write.MetricWriteStep.execute(MetricWriteStep.scala:46)
>         at
> org.apache.griffin.measure.step.SeqDQStep$$anonfun$execute$1.apply(SeqDQStep.scala:37)
>         at
> org.apache.griffin.measure.step.SeqDQStep$$anonfun$execute$1.apply(SeqDQStep.scala:36)
>         at
> scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
>         at scala.collection.immutable.List.foldLeft(List.scala:84)
>         at
> org.apache.griffin.measure.step.SeqDQStep.execute(SeqDQStep.scala:36)
>         at
> org.apache.griffin.measure.job.DQJob$$anonfun$execute$1.apply(DQJob.scala:31)
>         at
> org.apache.griffin.measure.job.DQJob$$anonfun$execute$1.apply(DQJob.scala:30)
>         at
> scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
>         at scala.collection.immutable.List.foldLeft(List.scala:84)
>         at org.apache.griffin.measure.job.DQJob.execute(DQJob.scala:30)
>         at
> org.apache.griffin.measure.launch.batch.BatchDQApp$$anonfun$run$1.apply$mcZ$sp(BatchDQApp.scala:103)
>         at
> org.apache.griffin.measure.launch.batch.BatchDQApp$$anonfun$run$1.apply(BatchDQApp.scala:67)
>         at
> org.apache.griffin.measure.launch.batch.BatchDQApp$$anonfun$run$1.apply(BatchDQApp.scala:67)
>         at scala.util.Try$.apply(Try.scala:192)
>         at
> org.apache.griffin.measure.launch.batch.BatchDQApp.run(BatchDQApp.scala:67)
>         at
> org.apache.griffin.measure.Application$.main(Application.scala:88)
>         at org.apache.griffin.measure.Application.main(Application.scala)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at
> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$4.run(ApplicationMaster.scala:721)
> 18/11/30 21:44:12 INFO MetricWriteStep: metricMaps => List()
>
> I tried to run Job with half of the data sets (500M) and it run
> successfully.
> Below is the log of the metric.
> 18/11/30 20:37:31 INFO MetricWriteStep: metricMaps =>
> WrappedArray(Map(total -> 508880897, miss -> 0, matched -> 508880897,
> matchedFraction -> 1.0))
>
> But if I try to run it even with 700M records, it fails.
> 18/11/30 21:44:12 ERROR MetricWriteStep: get metric accuracy fails
> 18/11/30 21:44:12 INFO MetricWriteStep: metricMaps => List()
>
> Is it something related to Join query being used inside
> AccuracyExpr2DQSteps.scala to calculate miss records?
>
> Any pointers will be appreciated.
>
> Thanks,
> Dhiren
>

RE: Accuracy measure fails on large dataset

Posted by "Lionel, Liu" <bh...@163.com>.
Hi Dhiren,

Seems like the resources are not your limit. 
I doubt that there might be some data skew in your data, you can find this by monitoring the spark job running status.
If there’s data skew, there’re several ways to fix it, you can also google for it.

Thanks
Lionel, Liu

From: Dhiren Sangani
Sent: 2018年12月4日 1:20
To: Lionel Liu; dev@griffin.apache.org
Cc: dev@griffin.incubator.apache.org
Subject: RE: Accuracy measure fails on large dataset

Hi Lionel,

Thanks for the reply.

Below are the parameters being used while submitting the job.

"numExecutors": 10,
"executorCores": 6,
"driverMemory": "10g",
"executorMemory": "7g"

I have 2 Data nodes configured with Yarn each with 80G memory. Each server has 128G physical RAM.
I don’t have hourly level partition in my data. It’s day level only.

I tried removing extra columns from both the table (source, target) using spark-sql rules and after that job got succeeded.
There are 23 columns in table and both source and target has same number of columns.

But If I use data with all the columns, it’s failing.

Thanks,
Dhiren

From: Lionel Liu <li...@apache.org>
Sent: Monday, December 3, 2018 1:35 AM
To: dev@griffin.apache.org; Dhiren Sangani <dh...@enquero.com>
Cc: dev@griffin.incubator.apache.org
Subject: Re: Accuracy measure fails on large dataset

Hi Dhiren,

How many resources are your using when submit this job?
For large scale data, the simple solution is to use more resources for the calculation job.
For limited resources, a common solution is to partition your data by date or hour, to make it smaller in each partition, then you can calculate the partial data each time.

Thanks,
Lionel




On Mon, Dec 3, 2018 at 3:33 PM Dhiren Sangani <dh...@enquero.com>> wrote:
Hi Dev Team,

I am facing issue with Accuracy measure while running job on larger dataset (1B records).
I have source and target tables in Hive and using simple accuracy rule as below:

Source.Column1 = target.column1 AND source.column2 = target.column2

Getting below exception while running the job.

18/11/30 21:44:12 ERROR MetricWriteStep: get metric accuracy fails
java.util.concurrent.TimeoutException: Futures timed out after [3600 seconds]
        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
        at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
        at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:201)
        at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:136)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:144)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:140)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:140)
        at org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec.doExecute(BroadcastNestedLoopJoinExec.scala:343)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
        at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
        at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
       at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
        at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
        at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.DeserializeToObjectExec.doExecute(objects.scala:89)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.MapPartitionsExec.doExecute(objects.scala:185)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
        at org.apache.spark.sql.execution.SerializeFromObjectExec.inputRDDs(objects.scala:110)
        at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
        at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:294)
        at org.apache.spark.sql.Dataset.org<http://org.apache.spark.sql.Dataset.org>$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3278)
        at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2727)
        at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2727)
        at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3259)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3258)
        at org.apache.spark.sql.Dataset.collect(Dataset.scala:2727)
        at org.apache.griffin.measure.step.write.MetricWriteStep.getMetricMaps(MetricWriteStep.scala:86)
        at org.apache.griffin.measure.step.write.MetricWriteStep.execute(MetricWriteStep.scala:46)
        at org.apache.griffin.measure.step.SeqDQStep$$anonfun$execute$1.apply(SeqDQStep.scala:37)
        at org.apache.griffin.measure.step.SeqDQStep$$anonfun$execute$1.apply(SeqDQStep.scala:36)
        at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
        at scala.collection.immutable.List.foldLeft(List.scala:84)
        at org.apache.griffin.measure.step.SeqDQStep.execute(SeqDQStep.scala:36)
        at org.apache.griffin.measure.job.DQJob$$anonfun$execute$1.apply(DQJob.scala:31)
        at org.apache.griffin.measure.job.DQJob$$anonfun$execute$1.apply(DQJob.scala:30)
        at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
        at scala.collection.immutable.List.foldLeft(List.scala:84)
        at org.apache.griffin.measure.job.DQJob.execute(DQJob.scala:30)
        at org.apache.griffin.measure.launch.batch.BatchDQApp$$anonfun$run$1.apply$mcZ$sp(BatchDQApp.scala:103)
        at org.apache.griffin.measure.launch.batch.BatchDQApp$$anonfun$run$1.apply(BatchDQApp.scala:67)
        at org.apache.griffin.measure.launch.batch.BatchDQApp$$anonfun$run$1.apply(BatchDQApp.scala:67)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.griffin.measure.launch.batch.BatchDQApp.run(BatchDQApp.scala:67)
        at org.apache.griffin.measure.Application$.main(Application.scala:88)
        at org.apache.griffin.measure.Application.main(Application.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$4.run(ApplicationMaster.scala:721)
18/11/30 21:44:12 INFO MetricWriteStep: metricMaps => List()

I tried to run Job with half of the data sets (500M) and it run successfully.
Below is the log of the metric.
18/11/30 20:37:31 INFO MetricWriteStep: metricMaps => WrappedArray(Map(total -> 508880897, miss -> 0, matched -> 508880897, matchedFraction -> 1.0))

But if I try to run it even with 700M records, it fails.
18/11/30 21:44:12 ERROR MetricWriteStep: get metric accuracy fails
18/11/30 21:44:12 INFO MetricWriteStep: metricMaps => List()

Is it something related to Join query being used inside AccuracyExpr2DQSteps.scala to calculate miss records?

Any pointers will be appreciated.

Thanks,
Dhiren


RE: Accuracy measure fails on large dataset

Posted by Dhiren Sangani <dh...@enquero.com>.
Hi Lionel,

Thanks for the reply.

Below are the parameters being used while submitting the job.

"numExecutors": 10,
"executorCores": 6,
"driverMemory": "10g",
"executorMemory": "7g"

I have 2 Data nodes configured with Yarn each with 80G memory. Each server has 128G physical RAM.
I don’t have hourly level partition in my data. It’s day level only.

I tried removing extra columns from both the table (source, target) using spark-sql rules and after that job got succeeded.
There are 23 columns in table and both source and target has same number of columns.

But If I use data with all the columns, it’s failing.

Thanks,
Dhiren

From: Lionel Liu <li...@apache.org>
Sent: Monday, December 3, 2018 1:35 AM
To: dev@griffin.apache.org; Dhiren Sangani <dh...@enquero.com>
Cc: dev@griffin.incubator.apache.org
Subject: Re: Accuracy measure fails on large dataset

Hi Dhiren,

How many resources are your using when submit this job?
For large scale data, the simple solution is to use more resources for the calculation job.
For limited resources, a common solution is to partition your data by date or hour, to make it smaller in each partition, then you can calculate the partial data each time.

Thanks,
Lionel




On Mon, Dec 3, 2018 at 3:33 PM Dhiren Sangani <dh...@enquero.com>> wrote:
Hi Dev Team,

I am facing issue with Accuracy measure while running job on larger dataset (1B records).
I have source and target tables in Hive and using simple accuracy rule as below:

Source.Column1 = target.column1 AND source.column2 = target.column2

Getting below exception while running the job.

18/11/30 21:44:12 ERROR MetricWriteStep: get metric accuracy fails
java.util.concurrent.TimeoutException: Futures timed out after [3600 seconds]
        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
        at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
        at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:201)
        at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:136)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:144)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:140)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:140)
        at org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec.doExecute(BroadcastNestedLoopJoinExec.scala:343)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
        at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
        at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
       at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
        at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
        at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.DeserializeToObjectExec.doExecute(objects.scala:89)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.MapPartitionsExec.doExecute(objects.scala:185)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
        at org.apache.spark.sql.execution.SerializeFromObjectExec.inputRDDs(objects.scala:110)
        at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
        at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:294)
        at org.apache.spark.sql.Dataset.org<http://org.apache.spark.sql.Dataset.org>$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3278)
        at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2727)
        at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2727)
        at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3259)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3258)
        at org.apache.spark.sql.Dataset.collect(Dataset.scala:2727)
        at org.apache.griffin.measure.step.write.MetricWriteStep.getMetricMaps(MetricWriteStep.scala:86)
        at org.apache.griffin.measure.step.write.MetricWriteStep.execute(MetricWriteStep.scala:46)
        at org.apache.griffin.measure.step.SeqDQStep$$anonfun$execute$1.apply(SeqDQStep.scala:37)
        at org.apache.griffin.measure.step.SeqDQStep$$anonfun$execute$1.apply(SeqDQStep.scala:36)
        at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
        at scala.collection.immutable.List.foldLeft(List.scala:84)
        at org.apache.griffin.measure.step.SeqDQStep.execute(SeqDQStep.scala:36)
        at org.apache.griffin.measure.job.DQJob$$anonfun$execute$1.apply(DQJob.scala:31)
        at org.apache.griffin.measure.job.DQJob$$anonfun$execute$1.apply(DQJob.scala:30)
        at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
        at scala.collection.immutable.List.foldLeft(List.scala:84)
        at org.apache.griffin.measure.job.DQJob.execute(DQJob.scala:30)
        at org.apache.griffin.measure.launch.batch.BatchDQApp$$anonfun$run$1.apply$mcZ$sp(BatchDQApp.scala:103)
        at org.apache.griffin.measure.launch.batch.BatchDQApp$$anonfun$run$1.apply(BatchDQApp.scala:67)
        at org.apache.griffin.measure.launch.batch.BatchDQApp$$anonfun$run$1.apply(BatchDQApp.scala:67)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.griffin.measure.launch.batch.BatchDQApp.run(BatchDQApp.scala:67)
        at org.apache.griffin.measure.Application$.main(Application.scala:88)
        at org.apache.griffin.measure.Application.main(Application.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$4.run(ApplicationMaster.scala:721)
18/11/30 21:44:12 INFO MetricWriteStep: metricMaps => List()

I tried to run Job with half of the data sets (500M) and it run successfully.
Below is the log of the metric.
18/11/30 20:37:31 INFO MetricWriteStep: metricMaps => WrappedArray(Map(total -> 508880897, miss -> 0, matched -> 508880897, matchedFraction -> 1.0))

But if I try to run it even with 700M records, it fails.
18/11/30 21:44:12 ERROR MetricWriteStep: get metric accuracy fails
18/11/30 21:44:12 INFO MetricWriteStep: metricMaps => List()

Is it something related to Join query being used inside AccuracyExpr2DQSteps.scala to calculate miss records?

Any pointers will be appreciated.

Thanks,
Dhiren

Re: Accuracy measure fails on large dataset

Posted by Lionel Liu <li...@apache.org>.
Hi Dhiren,

How many resources are your using when submit this job?
For large scale data, the simple solution is to use more resources for the
calculation job.
For limited resources, a common solution is to partition your data by date
or hour, to make it smaller in each partition, then you can calculate the
partial data each time.

Thanks,
Lionel




On Mon, Dec 3, 2018 at 3:33 PM Dhiren Sangani <dh...@enquero.com>
wrote:

> Hi Dev Team,
>
> I am facing issue with Accuracy measure while running job on larger
> dataset (1B records).
> I have source and target tables in Hive and using simple accuracy rule as
> below:
>
> Source.Column1 = target.column1 AND source.column2 = target.column2
>
> Getting below exception while running the job.
>
> 18/11/30 21:44:12 ERROR MetricWriteStep: get metric accuracy fails
> java.util.concurrent.TimeoutException: Futures timed out after [3600
> seconds]
>         at
> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>         at
> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>         at
> org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:201)
>         at
> org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:136)
>         at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:144)
>         at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:140)
>         at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
>         at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>         at
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>         at
> org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:140)
>         at
> org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec.doExecute(BroadcastNestedLoopJoinExec.scala:343)
>         at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>         at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>         at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
>         at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>         at
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>         at
> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>         at
> org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
>         at
> org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
>         at
> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
>         at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>         at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>         at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
>        at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>         at
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>         at
> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>         at
> org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
>         at
> org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
>         at
> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
>         at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>         at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>         at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
>         at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>         at
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>         at
> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>         at
> org.apache.spark.sql.execution.DeserializeToObjectExec.doExecute(objects.scala:89)
>         at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>         at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>         at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
>         at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>         at
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>         at
> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>         at
> org.apache.spark.sql.execution.MapPartitionsExec.doExecute(objects.scala:185)
>         at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>         at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>         at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
>         at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>         at
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>         at
> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>         at
> org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
>         at
> org.apache.spark.sql.execution.SerializeFromObjectExec.inputRDDs(objects.scala:110)
>         at
> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
>         at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>         at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>         at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
>         at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>         at
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>         at
> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>         at
> org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
>         at
> org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:294)
>         at org.apache.spark.sql.Dataset.org
> $apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3278)
>         at
> org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2727)
>         at
> org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2727)
>         at
> org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3259)
>         at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
>         at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3258)
>         at org.apache.spark.sql.Dataset.collect(Dataset.scala:2727)
>         at
> org.apache.griffin.measure.step.write.MetricWriteStep.getMetricMaps(MetricWriteStep.scala:86)
>         at
> org.apache.griffin.measure.step.write.MetricWriteStep.execute(MetricWriteStep.scala:46)
>         at
> org.apache.griffin.measure.step.SeqDQStep$$anonfun$execute$1.apply(SeqDQStep.scala:37)
>         at
> org.apache.griffin.measure.step.SeqDQStep$$anonfun$execute$1.apply(SeqDQStep.scala:36)
>         at
> scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
>         at scala.collection.immutable.List.foldLeft(List.scala:84)
>         at
> org.apache.griffin.measure.step.SeqDQStep.execute(SeqDQStep.scala:36)
>         at
> org.apache.griffin.measure.job.DQJob$$anonfun$execute$1.apply(DQJob.scala:31)
>         at
> org.apache.griffin.measure.job.DQJob$$anonfun$execute$1.apply(DQJob.scala:30)
>         at
> scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
>         at scala.collection.immutable.List.foldLeft(List.scala:84)
>         at org.apache.griffin.measure.job.DQJob.execute(DQJob.scala:30)
>         at
> org.apache.griffin.measure.launch.batch.BatchDQApp$$anonfun$run$1.apply$mcZ$sp(BatchDQApp.scala:103)
>         at
> org.apache.griffin.measure.launch.batch.BatchDQApp$$anonfun$run$1.apply(BatchDQApp.scala:67)
>         at
> org.apache.griffin.measure.launch.batch.BatchDQApp$$anonfun$run$1.apply(BatchDQApp.scala:67)
>         at scala.util.Try$.apply(Try.scala:192)
>         at
> org.apache.griffin.measure.launch.batch.BatchDQApp.run(BatchDQApp.scala:67)
>         at
> org.apache.griffin.measure.Application$.main(Application.scala:88)
>         at org.apache.griffin.measure.Application.main(Application.scala)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at
> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$4.run(ApplicationMaster.scala:721)
> 18/11/30 21:44:12 INFO MetricWriteStep: metricMaps => List()
>
> I tried to run Job with half of the data sets (500M) and it run
> successfully.
> Below is the log of the metric.
> 18/11/30 20:37:31 INFO MetricWriteStep: metricMaps =>
> WrappedArray(Map(total -> 508880897, miss -> 0, matched -> 508880897,
> matchedFraction -> 1.0))
>
> But if I try to run it even with 700M records, it fails.
> 18/11/30 21:44:12 ERROR MetricWriteStep: get metric accuracy fails
> 18/11/30 21:44:12 INFO MetricWriteStep: metricMaps => List()
>
> Is it something related to Join query being used inside
> AccuracyExpr2DQSteps.scala to calculate miss records?
>
> Any pointers will be appreciated.
>
> Thanks,
> Dhiren
>
>