You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Jack Wenger <ja...@gmail.com> on 2016/09/14 16:47:55 UTC

ACID transactions on data added from Spark not working

Hi there,

I'm trying to use ACID transactions in Hive but I have a problem when the
data are added with Spark.

First, I created a table with the following statement :
____________________________________________________________
__________________________
CREATE TABLE testdb.test(id string, col1 string)
CLUSTERED BY (id) INTO 4 BUCKETS
STORED AS ORC TBLPROPERTIES('transactional'='true');
____________________________________________________________
__________________________

Then I added data with those queries :
____________________________________________________________
__________________________
INSERT INTO testdb.test VALUES("1", "A");
INSERT INTO testdb.test VALUES("2", "B");
INSERT INTO testdb.test VALUES("3", "C");
____________________________________________________________
__________________________

And I've been able to delete rows with this query :
____________________________________________________________
__________________________
DELETE FROM testdb.test WHERE id="1";
____________________________________________________________
__________________________

All that worked perfectly, but a problem occurs when I try to delete rows
that were added with Spark.

What I do in Spark (iPython) :
____________________________________________________________
__________________________
hc = HiveContext(sc)
data = sc.parallelize([["1", "A"], ["2", "B"], ["3", "C"]])
data_df = hc.createDataFrame(data)
data_df.registerTempTable(data_df)
hc.sql("INSERT INTO testdb.test SELECT * FROM data_df");
____________________________________________________________
__________________________

Then, when I come back to Hive, I'm able to run a SELECT query on this the
"test" table.
However, when I try to run the exact same DELETE query as before, I have
the following error (it happens after the reduce phase) :

____________________________________________________________
__________________________

Error: java.lang.RuntimeException:
org.apache.hadoop.hive.ql.metadata.HiveException:
Hive Runtime Error while processing row (tag=0)
{"key":{"reducesinkkey0":{"transactionid":0,"bucketid":-1,"
rowid":0}},"value":null}
at org.apache.hadoop.hive.ql.exec.mr.ExecReducer.reduce(ExecRed
ucer.java:265)
at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:444)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:392)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGro
upInformation.java:1671)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime
Error while processing row (tag=0) {"key":{"reducesinkkey0":{"tra
nsactionid":0,"bucketid":-1,"rowid":0}},"value":null}
at org.apache.hadoop.hive.ql.exec.mr.ExecReducer.reduce(ExecRed
ucer.java:253)
... 7 more
Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
at org.apache.hadoop.hive.ql.exec.FileSinkOperator.processOp(
FileSinkOperator.java:723)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:815)
at org.apache.hadoop.hive.ql.exec.SelectOperator.processOp(Sele
ctOperator.java:84)
at org.apache.hadoop.hive.ql.exec.mr.ExecReducer.reduce(ExecRed
ucer.java:244)
... 7 more

____________________________________________________________
__________________________

I have no idea where this is coming from, that is why I'm looking for
advices on this mailing list.

I'm using the Cloudera Quickstart VM (5.4.2).
Hive version : 1.1.0
Spark Version : 1.3.0

And here is the complete output of the Hive DELETE command :
____________________________________________________________
__________________________

hive> delete from testdb.test where id="1";

Query ID = cloudera_20160914090303_795e40b7-ab6a-45b0-8391-6d41d1cfe7bd
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks determined at compile time: 4
In order to change the average load for a reducer (in bytes):
  set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
  set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
  set mapreduce.job.reduces=<number>
Starting Job = job_1473858545651_0036, Tracking URL =
http://quickstart.cloudera:8088/proxy/application_1473858545651_0036/
Kill Command = /usr/lib/hadoop/bin/hadoop job  -kill job_1473858545651_0036
Hadoop job information for Stage-1: number of mappers: 2; number of
reducers: 4
2016-09-14 09:03:55,571 Stage-1 map = 0%,  reduce = 0%
2016-09-14 09:04:14,898 Stage-1 map = 50%,  reduce = 0%, Cumulative CPU
1.66 sec
2016-09-14 09:04:15,944 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU
3.33 sec
2016-09-14 09:04:44,101 Stage-1 map = 100%,  reduce = 17%, Cumulative CPU
4.21 sec
2016-09-14 09:04:46,523 Stage-1 map = 100%,  reduce = 25%, Cumulative CPU
4.79 sec
2016-09-14 09:04:47,673 Stage-1 map = 100%,  reduce = 42%, Cumulative CPU
5.8 sec
2016-09-14 09:04:50,041 Stage-1 map = 100%,  reduce = 75%, Cumulative CPU
7.45 sec
2016-09-14 09:05:18,486 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU
7.69 sec
MapReduce Total cumulative CPU time: 7 seconds 690 msec
Ended Job = job_1473858545651_0036 with errors
Error during job, obtaining debugging information...
Job Tracking URL: http://quickstart.cloudera:8088/proxy/
application_1473858545651_0036/
Examining task ID: task_1473858545651_0036_m_000000 (and more) from job
job_1473858545651_0036

Task with the most failures(4):
-----
Task ID:
  task_1473858545651_0036_r_000001

URL:
  http://0.0.0.0:8088/taskdetails.jsp?jobid=job_147385854565
1_0036&tipid=task_1473858545651_0036_r_000001
-----
Diagnostic Messages for this Task:
Error: java.lang.RuntimeException:
org.apache.hadoop.hive.ql.metadata.HiveException:
Hive Runtime Error while processing row (tag=0)
{"key":{"reducesinkkey0":{"transactionid":0,"bucketid":-1,"
rowid":0}},"value":null}
at org.apache.hadoop.hive.ql.exec.mr.ExecReducer.reduce(ExecRed
ucer.java:265)
at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:444)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:392)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGro
upInformation.java:1671)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime
Error while processing row (tag=0) {"key":{"reducesinkkey0":{"tra
nsactionid":0,"bucketid":-1,"rowid":0}},"value":null}
at org.apache.hadoop.hive.ql.exec.mr.ExecReducer.reduce(ExecRed
ucer.java:253)
... 7 more
Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
at org.apache.hadoop.hive.ql.exec.FileSinkOperator.processOp(
FileSinkOperator.java:723)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:815)
at org.apache.hadoop.hive.ql.exec.SelectOperator.processOp(Sele
ctOperator.java:84)
at org.apache.hadoop.hive.ql.exec.mr.ExecReducer.reduce(ExecRed
ucer.java:244)
... 7 more


FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec
.mr.MapRedTask
MapReduce Jobs Launched:
Stage-Stage-1: Map: 2  Reduce: 4   Cumulative CPU: 7.69 sec   HDFS Read:
21558 HDFS Write: 114 FAIL
Total MapReduce CPU Time Spent: 7 seconds 690 msec

____________________________________________________________
__________________________

Thanks !

Re: ACID transactions on data added from Spark not working

Posted by Mich Talebzadeh <mi...@gmail.com>.
Hi,

I believe this is an issue with Spark handing transactional tables in Hive.

When you add rows from Spark to ORC transactional table, Hive metadata
tables HIVE_LOCKS and TXNS tables are not updated. This does not happen
with Hive itself. As a result these new rows are left in an inconsistent
state.

Also if you delete rows from the said tables using Hive, many delta files
will be produced. In that case Hive will compact them eventually (roll them
into the main file itself), and can read data. However, Spark will not be
able to read the delta files until all is compacted.

Anyway this is my experience.

HTH


Dr Mich Talebzadeh



LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 14 September 2016 at 17:47, Jack Wenger <ja...@gmail.com> wrote:

> Hi there,
>
> I'm trying to use ACID transactions in Hive but I have a problem when the
> data are added with Spark.
>
> First, I created a table with the following statement :
> ____________________________________________________________
> __________________________
> CREATE TABLE testdb.test(id string, col1 string)
> CLUSTERED BY (id) INTO 4 BUCKETS
> STORED AS ORC TBLPROPERTIES('transactional'='true');
> ____________________________________________________________
> __________________________
>
> Then I added data with those queries :
> ____________________________________________________________
> __________________________
> INSERT INTO testdb.test VALUES("1", "A");
> INSERT INTO testdb.test VALUES("2", "B");
> INSERT INTO testdb.test VALUES("3", "C");
> ____________________________________________________________
> __________________________
>
> And I've been able to delete rows with this query :
> ____________________________________________________________
> __________________________
> DELETE FROM testdb.test WHERE id="1";
> ____________________________________________________________
> __________________________
>
> All that worked perfectly, but a problem occurs when I try to delete rows
> that were added with Spark.
>
> What I do in Spark (iPython) :
> ____________________________________________________________
> __________________________
> hc = HiveContext(sc)
> data = sc.parallelize([["1", "A"], ["2", "B"], ["3", "C"]])
> data_df = hc.createDataFrame(data)
> data_df.registerTempTable(data_df)
> hc.sql("INSERT INTO testdb.test SELECT * FROM data_df");
> ____________________________________________________________
> __________________________
>
> Then, when I come back to Hive, I'm able to run a SELECT query on this the
> "test" table.
> However, when I try to run the exact same DELETE query as before, I have
> the following error (it happens after the reduce phase) :
>
> ____________________________________________________________
> __________________________
>
> Error: java.lang.RuntimeException: org.apache.hadoop.hive.ql.metadata.HiveException:
> Hive Runtime Error while processing row (tag=0)
> {"key":{"reducesinkkey0":{"transactionid":0,"bucketid":-1,"r
> owid":0}},"value":null}
> at org.apache.hadoop.hive.ql.exec.mr.ExecReducer.reduce(ExecRed
> ucer.java:265)
> at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:444)
> at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:392)
> at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:415)
> at org.apache.hadoop.security.UserGroupInformation.doAs(UserGro
> upInformation.java:1671)
> at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
> Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime
> Error while processing row (tag=0) {"key":{"reducesinkkey0":{"tra
> nsactionid":0,"bucketid":-1,"rowid":0}},"value":null}
> at org.apache.hadoop.hive.ql.exec.mr.ExecReducer.reduce(ExecRed
> ucer.java:253)
> ... 7 more
> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
> at org.apache.hadoop.hive.ql.exec.FileSinkOperator.processOp(Fi
> leSinkOperator.java:723)
> at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:815)
> at org.apache.hadoop.hive.ql.exec.SelectOperator.processOp(Sele
> ctOperator.java:84)
> at org.apache.hadoop.hive.ql.exec.mr.ExecReducer.reduce(ExecRed
> ucer.java:244)
> ... 7 more
>
> ____________________________________________________________
> __________________________
>
> I have no idea where this is coming from, that is why I'm looking for
> advices on this mailing list.
>
> I'm using the Cloudera Quickstart VM (5.4.2).
> Hive version : 1.1.0
> Spark Version : 1.3.0
>
> And here is the complete output of the Hive DELETE command :
> ____________________________________________________________
> __________________________
>
> hive> delete from testdb.test where id="1";
>
> Query ID = cloudera_20160914090303_795e40b7-ab6a-45b0-8391-6d41d1cfe7bd
> Total jobs = 1
> Launching Job 1 out of 1
> Number of reduce tasks determined at compile time: 4
> In order to change the average load for a reducer (in bytes):
>   set hive.exec.reducers.bytes.per.reducer=<number>
> In order to limit the maximum number of reducers:
>   set hive.exec.reducers.max=<number>
> In order to set a constant number of reducers:
>   set mapreduce.job.reduces=<number>
> Starting Job = job_1473858545651_0036, Tracking URL =
> http://quickstart.cloudera:8088/proxy/application_1473858545651_0036/
> Kill Command = /usr/lib/hadoop/bin/hadoop job  -kill job_1473858545651_0036
> Hadoop job information for Stage-1: number of mappers: 2; number of
> reducers: 4
> 2016-09-14 09:03:55,571 Stage-1 map = 0%,  reduce = 0%
> 2016-09-14 09:04:14,898 Stage-1 map = 50%,  reduce = 0%, Cumulative CPU
> 1.66 sec
> 2016-09-14 09:04:15,944 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU
> 3.33 sec
> 2016-09-14 09:04:44,101 Stage-1 map = 100%,  reduce = 17%, Cumulative CPU
> 4.21 sec
> 2016-09-14 09:04:46,523 Stage-1 map = 100%,  reduce = 25%, Cumulative CPU
> 4.79 sec
> 2016-09-14 09:04:47,673 Stage-1 map = 100%,  reduce = 42%, Cumulative CPU
> 5.8 sec
> 2016-09-14 09:04:50,041 Stage-1 map = 100%,  reduce = 75%, Cumulative CPU
> 7.45 sec
> 2016-09-14 09:05:18,486 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU
> 7.69 sec
> MapReduce Total cumulative CPU time: 7 seconds 690 msec
> Ended Job = job_1473858545651_0036 with errors
> Error during job, obtaining debugging information...
> Job Tracking URL: http://quickstart.cloudera:8088/proxy/application_
> 1473858545651_0036/
> Examining task ID: task_1473858545651_0036_m_000000 (and more) from job
> job_1473858545651_0036
>
> Task with the most failures(4):
> -----
> Task ID:
>   task_1473858545651_0036_r_000001
>
> URL:
>   http://0.0.0.0:8088/taskdetails.jsp?jobid=job_147385854565
> 1_0036&tipid=task_1473858545651_0036_r_000001
> -----
> Diagnostic Messages for this Task:
> Error: java.lang.RuntimeException: org.apache.hadoop.hive.ql.metadata.HiveException:
> Hive Runtime Error while processing row (tag=0)
> {"key":{"reducesinkkey0":{"transactionid":0,"bucketid":-1,"r
> owid":0}},"value":null}
> at org.apache.hadoop.hive.ql.exec.mr.ExecReducer.reduce(ExecRed
> ucer.java:265)
> at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:444)
> at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:392)
> at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:415)
> at org.apache.hadoop.security.UserGroupInformation.doAs(UserGro
> upInformation.java:1671)
> at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
> Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime
> Error while processing row (tag=0) {"key":{"reducesinkkey0":{"tra
> nsactionid":0,"bucketid":-1,"rowid":0}},"value":null}
> at org.apache.hadoop.hive.ql.exec.mr.ExecReducer.reduce(ExecRed
> ucer.java:253)
> ... 7 more
> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
> at org.apache.hadoop.hive.ql.exec.FileSinkOperator.processOp(Fi
> leSinkOperator.java:723)
> at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:815)
> at org.apache.hadoop.hive.ql.exec.SelectOperator.processOp(Sele
> ctOperator.java:84)
> at org.apache.hadoop.hive.ql.exec.mr.ExecReducer.reduce(ExecRed
> ucer.java:244)
> ... 7 more
>
>
> FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec
> .mr.MapRedTask
> MapReduce Jobs Launched:
> Stage-Stage-1: Map: 2  Reduce: 4   Cumulative CPU: 7.69 sec   HDFS Read:
> 21558 HDFS Write: 114 FAIL
> Total MapReduce CPU Time Spent: 7 seconds 690 msec
>
> ____________________________________________________________
> __________________________
>
> Thanks !
>