You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flume.apache.org by Thanh Hong Dai <hd...@tma.com.vn> on 2016/07/05 04:47:26 UTC

Unable to deliver event. org.apache.flume.EventDeliveryException: java.lang.IllegalStateException: TransactionBatch has been closed()

Does anyone knows the cause of this exception when using Hive Sink, and how
to fix it?

 

The Hive Sink managed to write data in the Hive table for a few minutes
(which I can confirm by querying the table), but then it shows the Exception
below in the log file (/var/log/flume/flume-<streamname>.log) for all the
nodes.

 

05 Jul 2016 04:24:22,737 ERROR
[SinkRunner-PollingRunner-DefaultSinkProcessor]
(org.apache.flume.SinkRunner$PollingRunner.run:160)  - Unable to deliver
event. Exception follows.

org.apache.flume.EventDeliveryException: java.lang.IllegalStateException:
TransactionBatch TxnIds=[29489...30488] on endPoint =
{metaStoreUri='thrift://hive.metastore:9083', database='default',
table='acid', partitionVals=[0804] } has been closed()

        at org.apache.flume.sink.hive.HiveSink.process(HiveSink.java:375)

        at
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java
:68)

        at
org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)

        at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.IllegalStateException: TransactionBatch
TxnIds=[29489...30488] on endPoint =
{metaStoreUri='thrift://hive.metastore:9083', database='default',
table='acid', partitionVals=[0804] } has been closed()

        at
org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.checkIs
Closed(HiveEndPoint.java:690)

        at
org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(H
iveEndPoint.java:729)

        at
org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(H
iveEndPoint.java:686)

        at
org.apache.flume.sink.hive.HiveDelimitedTextSerializer.write(HiveDelimitedTe
xtSerializer.java:48)

        at org.apache.flume.sink.hive.HiveWriter$1.call(HiveWriter.java:161)

        at org.apache.flume.sink.hive.HiveWriter$1.call(HiveWriter.java:155)

        at
org.apache.flume.sink.hive.HiveWriter$11.call(HiveWriter.java:425)

        at java.util.concurrent.FutureTask.run(FutureTask.java:266)

        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:11
42)

        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:6
17)

        ... 1 more

05 Jul 2016 04:24:27,891 ERROR
[SinkRunner-PollingRunner-DefaultSinkProcessor]
(org.apache.flume.SinkRunner$PollingRunner.run:160)  - Unable to deliver
event. Exception follows.

org.apache.flume.EventDeliveryException: java.lang.IllegalStateException:
TransactionBatch TxnIds=[29489...30488] on endPoint =
{metaStoreUri='thrift://hive.metastore:9083', database='default',
table='acid', partitionVals=[0804] } has been closed()

        at org.apache.flume.sink.hive.HiveSink.process(HiveSink.java:375)

        at
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java
:68)

        at
org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)

        at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.IllegalStateException: TransactionBatch
TxnIds=[29489...30488] on endPoint =
{metaStoreUri='thrift://hive.metastore:9083', database='default',
table='acid', partitionVals=[0804] } has been closed()

        at
org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.checkIs
Closed(HiveEndPoint.java:690)

        at
org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(H
iveEndPoint.java:729)

        at
org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(H
iveEndPoint.java:686)

        at
org.apache.flume.sink.hive.HiveDelimitedTextSerializer.write(HiveDelimitedTe
xtSerializer.java:48)

        at org.apache.flume.sink.hive.HiveWriter$1.call(HiveWriter.java:161)

        at org.apache.flume.sink.hive.HiveWriter$1.call(HiveWriter.java:155)

        at
org.apache.flume.sink.hive.HiveWriter$11.call(HiveWriter.java:425)

        at java.util.concurrent.FutureTask.run(FutureTask.java:266)

        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:11
42)

        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:6
17)

        ... 1 more

 

My flume.conf file:

 

# acidstream - streaming data from Kafka into Hive transactional table

acidstream.sources = kafka-source

acidstream.sinks = hive-sink

acidstream.channels = gutter

 

acidstream.sources.kafka-source.channels = gutter

acidstream.sources.kafka-source.type =
org.apache.flume.source.kafka.KafkaSource

acidstream.sources.kafka-source.zookeeperConnect =
chdhost125.vitaldev.tma.com.vn:2181,chdhost27.vitaldev.tma.com.vn:2181,chdho
st185.vitaldev.tma.com.vn:2181

acidstream.sources.kafka-source.topic = lan

acidstream.sources.kafka-source.groupId = acid

acidstream.sources.kafka-source.batchSize = 10000

acidstream.sources.kafka-source.batchDurationMillis = 60000

acidstream.sources.kafka-source.kafka.consumer.timeout.ms = 200

 

acidstream.sources.kafka-source.interceptors = i1

acidstream.sources.kafka-source.interceptors.i1.type = regex_extractor

acidstream.sources.kafka-source.interceptors.i1.regex =
^(\\d{4}-\\d{2}-\\d{2}\\s\\d{2}:\\d{2}:\\d{2})

acidstream.sources.kafka-source.interceptors.i1.serializers = s1

acidstream.sources.kafka-source.interceptors.i1.serializers.s1.type =
org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer

acidstream.sources.kafka-source.interceptors.i1.serializers.s1.name =
timestamp

acidstream.sources.kafka-source.interceptors.i1.serializers.s1.pattern =
yyyy-MM-dd HH:mm:ss

 

acidstream.sinks.hive-sink.channel = gutter

acidstream.sinks.hive-sink.type = hive

acidstream.sinks.hive-sink.hive.metastore = thrift://hive.metastore:9083

acidstream.sinks.hive-sink.hive.database = default

acidstream.sinks.hive-sink.hive.table = acid

acidstream.sinks.hive-sink.hive.partition = %m%d

acidstream.sinks.hive-sink.heartBeatInterval = 10

acidstream.sinks.hive-sink.useLocalTimeStamp = false

acidstream.sinks.hive-sink.round = false

acidstream.sinks.hive-sink.hive.txnsPerBatchAsk = 1000

acidstream.sinks.hive-sink.batchSize = 10000

acidstream.sinks.hive-sink.callTimeout = 30000

acidstream.sinks.hive-sink.serializer = DELIMITED

acidstream.sinks.hive-sink.serializer.delimiter = "\t"

acidstream.sinks.hive-sink.serializer.serdeSeparator = '\t'

acidstream.sinks.hive-sink.serializer.fieldnames = timestamp,id,data

 

acidstream.channels.gutter.type = memory

acidstream.channels.gutter.capacity = 100000

acidstream.channels.gutter.transactionCapacity = 50000

 

My flume-env file has this line added:

 

export JAVA_OPTS="-Xms100m -Xmx3g"

 

My table on Hive has the following properties:

 

PARTITIONED BY (md string)

CLUSTERED BY (id) INTO 10 BUCKETS

STORED AS ORC

TBLPROPERTIES ('transactional' = 'true');

 

Hive has Tez engine set as the default execution engine.

 

Could this error be caused by low number of threads? (NameNode has 100
server threads available)

 

Best regards,

Thanh Hong.

 


RE: Unable to deliver event. org.apache.flume.EventDeliveryException: java.lang.IllegalStateException: TransactionBatch has been closed()

Posted by Thanh Hong Dai <hd...@tma.com.vn>.
Thanks for your reply. I guess I will have to wait for HDP to include the new version of Hive.

 

Best regards,

Thanh Hong.

 

From: Joe Lawson [mailto:jlawson@opensourceconnections.com] 
Sent: Wednesday, 6 July, 2016 2:41 AM
To: user@flume.apache.org
Subject: Re: Unable to deliver event. org.apache.flume.EventDeliveryException: java.lang.IllegalStateException: TransactionBatch has been closed()

 

You may want to look here: https://community.hortonworks.com/content/kbentry/4321/hive-acid-current-state.html

 

Flume 1.5.2 doesn't include Hive support AFAIK so whatever they built for Hortonworks is their own build. Note on the sink docs it says, "This sink is provided as a preview feature and not recommended for use in production." (https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.3.0/ds_flume/FlumeUserGuide.html)  It appears they are using hive version 1.2.1. Not sure what version the Sink lines up to. 

 

Looking here: http://hortonworks.com/blog/adding-acid-to-apache-hive/

 

I appears that Hive go support in 0.14.0 for ACID inserts (https://issues.apache.org/jira/browse/HIVE-5317) but has a but https://issues.apache.org/jira/browse/HIVE-12307?jql=project%20%3D%20HIVE%20AND%20text%20~%20%22TransactionBatch%20closed%22 about transactions closing that fixes in hive 1.3.0.

 

On Tue, Jul 5, 2016 at 1:01 AM, Thanh Hong Dai <hdthanh@tma.com.vn <ma...@tma.com.vn> > wrote:

I forgot to include the version information. I’m currently using Flume 1.5.2 from HDP 2.4.2.

 

Looking at the changelog of Flume 1.6.0, the latest version, there seems to be some improvements for Hive support.

This makes me wondering - does Flume 1.5.2 support Hive streaming to ACID table?

 

Best regards,

Thanh Hong.

 

From: Thanh Hong Dai [mailto:hdthanh@tma.com.vn <ma...@tma.com.vn> ] 
Sent: Tuesday, 5 July, 2016 11:47 AM
To: user@flume.apache.org <ma...@flume.apache.org> 
Subject: Unable to deliver event. org.apache.flume.EventDeliveryException: java.lang.IllegalStateException: TransactionBatch has been closed()

 

Does anyone knows the cause of this exception when using Hive Sink, and how to fix it?

 

The Hive Sink managed to write data in the Hive table for a few minutes (which I can confirm by querying the table), but then it shows the Exception below in the log file (/var/log/flume/flume-<streamname>.log) for all the nodes.

 

05 Jul 2016 04:24:22,737 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.SinkRunner$PollingRunner.run:160)  - Unable to deliver event. Exception follows.

org.apache.flume.EventDeliveryException: java.lang.IllegalStateException: TransactionBatch TxnIds=[29489...30488] on endPoint = {metaStoreUri='thrift://hive.metastore:9083', database='default', table='acid', partitionVals=[0804] } has been closed()

        at org.apache.flume.sink.hive.HiveSink.process(HiveSink.java:375)

        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)

        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)

        at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.IllegalStateException: TransactionBatch TxnIds=[29489...30488] on endPoint = {metaStoreUri='thrift://hive.metastore:9083', database='default', table='acid', partitionVals=[0804] } has been closed()

        at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.checkIsClosed(HiveEndPoint.java:690)

        at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(HiveEndPoint.java:729)

        at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(HiveEndPoint.java:686)

        at org.apache.flume.sink.hive.HiveDelimitedTextSerializer.write(HiveDelimitedTextSerializer.java:48)

        at org.apache.flume.sink.hive.HiveWriter$1.call(HiveWriter.java:161)

        at org.apache.flume.sink.hive.HiveWriter$1.call(HiveWriter.java:155)

        at org.apache.flume.sink.hive.HiveWriter$11.call(HiveWriter.java:425)

        at java.util.concurrent.FutureTask.run(FutureTask.java:266)

        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

        ... 1 more

05 Jul 2016 04:24:27,891 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.SinkRunner$PollingRunner.run:160)  - Unable to deliver event. Exception follows.

org.apache.flume.EventDeliveryException: java.lang.IllegalStateException: TransactionBatch TxnIds=[29489...30488] on endPoint = {metaStoreUri='thrift://hive.metastore:9083', database='default', table='acid', partitionVals=[0804] } has been closed()

        at org.apache.flume.sink.hive.HiveSink.process(HiveSink.java:375)

        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)

        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)

        at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.IllegalStateException: TransactionBatch TxnIds=[29489...30488] on endPoint = {metaStoreUri='thrift://hive.metastore:9083', database='default', table='acid', partitionVals=[0804] } has been closed()

        at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.checkIsClosed(HiveEndPoint.java:690)

        at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(HiveEndPoint.java:729)

        at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(HiveEndPoint.java:686)

        at org.apache.flume.sink.hive.HiveDelimitedTextSerializer.write(HiveDelimitedTextSerializer.java:48)

        at org.apache.flume.sink.hive.HiveWriter$1.call(HiveWriter.java:161)

        at org.apache.flume.sink.hive.HiveWriter$1.call(HiveWriter.java:155)

        at org.apache.flume.sink.hive.HiveWriter$11.call(HiveWriter.java:425)

        at java.util.concurrent.FutureTask.run(FutureTask.java:266)

        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

        ... 1 more

 

My flume.conf file:

 

# acidstream - streaming data from Kafka into Hive transactional table

acidstream.sources = kafka-source

acidstream.sinks = hive-sink

acidstream.channels = gutter

 

acidstream.sources.kafka-source.channels = gutter

acidstream.sources.kafka-source.type = org.apache.flume.source.kafka.KafkaSource

acidstream.sources.kafka-source.zookeeperConnect = chdhost125.vitaldev.tma.com.vn:2181 <http://chdhost125.vitaldev.tma.com.vn:2181> ,chdhost27.vitaldev.tma.com.vn:2181 <http://chdhost27.vitaldev.tma.com.vn:2181> ,chdhost185.vitaldev.tma.com.vn:2181 <http://chdhost185.vitaldev.tma.com.vn:2181> 

acidstream.sources.kafka-source.topic = lan

acidstream.sources.kafka-source.groupId = acid

acidstream.sources.kafka-source.batchSize = 10000

acidstream.sources.kafka-source.batchDurationMillis = 60000

acidstream.sources.kafka-source.kafka.consumer.timeout.ms <http://acidstream.sources.kafka-source.kafka.consumer.timeout.ms>  = 200

 

acidstream.sources.kafka-source.interceptors = i1

acidstream.sources.kafka-source.interceptors.i1.type = regex_extractor

acidstream.sources.kafka-source.interceptors.i1.regex = ^(\\d{4}-\\d{2}-\\d{2}\\s\\d{2}:\\d{2}:\\d{2} <file:///\\d%7b4%7d-\d%7b2%7d-\d%7b2%7d\s\d%7b2%7d:\d%7b2%7d:\d%7b2%7d> )

acidstream.sources.kafka-source.interceptors.i1.serializers = s1

acidstream.sources.kafka-source.interceptors.i1.serializers.s1.type = org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer

acidstream.sources.kafka-source.interceptors.i1.serializers.s1.name <http://acidstream.sources.kafka-source.interceptors.i1.serializers.s1.name>  = timestamp

acidstream.sources.kafka-source.interceptors.i1.serializers.s1.pattern = yyyy-MM-dd HH:mm:ss

 

acidstream.sinks.hive-sink.channel = gutter

acidstream.sinks.hive-sink.type = hive

acidstream.sinks.hive-sink.hive.metastore = thrift://hive.metastore:9083

acidstream.sinks.hive-sink.hive.database = default

acidstream.sinks.hive-sink.hive.table = acid

acidstream.sinks.hive-sink.hive.partition = %m%d

acidstream.sinks.hive-sink.heartBeatInterval = 10

acidstream.sinks.hive-sink.useLocalTimeStamp = false

acidstream.sinks.hive-sink.round = false

acidstream.sinks.hive-sink.hive.txnsPerBatchAsk = 1000

acidstream.sinks.hive-sink.batchSize = 10000

acidstream.sinks.hive-sink.callTimeout = 30000

acidstream.sinks.hive-sink.serializer = DELIMITED

acidstream.sinks.hive-sink.serializer.delimiter = "\t"

acidstream.sinks.hive-sink.serializer.serdeSeparator = '\t'

acidstream.sinks.hive-sink.serializer.fieldnames = timestamp,id,data

 

acidstream.channels.gutter.type = memory

acidstream.channels.gutter.capacity = 100000

acidstream.channels.gutter.transactionCapacity = 50000

 

My flume-env file has this line added:

 

export JAVA_OPTS="-Xms100m -Xmx3g"

 

My table on Hive has the following properties:

 

PARTITIONED BY (md string)

CLUSTERED BY (id) INTO 10 BUCKETS

STORED AS ORC

TBLPROPERTIES ('transactional' = 'true');

 

Hive has Tez engine set as the default execution engine.

 

Could this error be caused by low number of threads? (NameNode has 100 server threads available)

 

Best regards,

Thanh Hong.

 

 


Re: Unable to deliver event. org.apache.flume.EventDeliveryException: java.lang.IllegalStateException: TransactionBatch has been closed()

Posted by Joe Lawson <jl...@opensourceconnections.com>.
You may want to look here:
https://community.hortonworks.com/content/kbentry/4321/hive-acid-current-state.html

Flume 1.5.2 doesn't include Hive support AFAIK so whatever they built for
Hortonworks is their own build. Note on the sink docs it says, "This sink
is provided as a preview feature and not recommended for use in
production." (
https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.3.0/ds_flume/FlumeUserGuide.html)
 It appears they are using hive version 1.2.1. Not sure what version the
Sink lines up to.

Looking here: http://hortonworks.com/blog/adding-acid-to-apache-hive/

I appears that Hive go support in 0.14.0 for ACID inserts (
https://issues.apache.org/jira/browse/HIVE-5317) but has a but
https://issues.apache.org/jira/browse/HIVE-12307?jql=project%20%3D%20HIVE%20AND%20text%20~%20%22TransactionBatch%20closed%22
about transactions closing that fixes in hive 1.3.0.

On Tue, Jul 5, 2016 at 1:01 AM, Thanh Hong Dai <hd...@tma.com.vn> wrote:

> I forgot to include the version information. I’m currently using Flume
> 1.5.2 from HDP 2.4.2.
>
>
>
> Looking at the changelog of Flume 1.6.0, the latest version, there seems
> to be some improvements for Hive support.
>
> This makes me wondering - does Flume 1.5.2 support Hive streaming to ACID
> table?
>
>
>
> Best regards,
>
> Thanh Hong.
>
>
>
> *From:* Thanh Hong Dai [mailto:hdthanh@tma.com.vn]
> *Sent:* Tuesday, 5 July, 2016 11:47 AM
> *To:* user@flume.apache.org
> *Subject:* Unable to deliver event.
> org.apache.flume.EventDeliveryException: java.lang.IllegalStateException:
> TransactionBatch has been closed()
>
>
>
> Does anyone knows the cause of this exception when using Hive Sink, and
> how to fix it?
>
>
>
> The Hive Sink managed to write data in the Hive table for a few minutes
> (which I can confirm by querying the table), but then it shows the
> Exception below in the log file (/var/log/flume/flume-<streamname>.log) for
> all the nodes.
>
>
>
> 05 Jul 2016 04:24:22,737 ERROR
> [SinkRunner-PollingRunner-DefaultSinkProcessor]
> (org.apache.flume.SinkRunner$PollingRunner.run:160)  - Unable to deliver
> event. Exception follows.
>
> org.apache.flume.EventDeliveryException: java.lang.IllegalStateException:
> TransactionBatch TxnIds=[29489...30488] on endPoint =
> {metaStoreUri='thrift://hive.metastore:9083', database='default',
> table='acid', partitionVals=[0804] } has been closed()
>
>         at org.apache.flume.sink.hive.HiveSink.process(HiveSink.java:375)
>
>         at
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>
>         at
> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>
>         at java.lang.Thread.run(Thread.java:745)
>
> Caused by: java.lang.IllegalStateException: TransactionBatch
> TxnIds=[29489...30488] on endPoint =
> {metaStoreUri='thrift://hive.metastore:9083', database='default',
> table='acid', partitionVals=[0804] } has been closed()
>
>         at
> org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.checkIsClosed(HiveEndPoint.java:690)
>
>         at
> org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(HiveEndPoint.java:729)
>
>         at
> org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(HiveEndPoint.java:686)
>
>         at
> org.apache.flume.sink.hive.HiveDelimitedTextSerializer.write(HiveDelimitedTextSerializer.java:48)
>
>         at
> org.apache.flume.sink.hive.HiveWriter$1.call(HiveWriter.java:161)
>
>         at
> org.apache.flume.sink.hive.HiveWriter$1.call(HiveWriter.java:155)
>
>         at
> org.apache.flume.sink.hive.HiveWriter$11.call(HiveWriter.java:425)
>
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>
>         ... 1 more
>
> 05 Jul 2016 04:24:27,891 ERROR
> [SinkRunner-PollingRunner-DefaultSinkProcessor]
> (org.apache.flume.SinkRunner$PollingRunner.run:160)  - Unable to deliver
> event. Exception follows.
>
> org.apache.flume.EventDeliveryException: java.lang.IllegalStateException:
> TransactionBatch TxnIds=[29489...30488] on endPoint =
> {metaStoreUri='thrift://hive.metastore:9083', database='default',
> table='acid', partitionVals=[0804] } has been closed()
>
>         at org.apache.flume.sink.hive.HiveSink.process(HiveSink.java:375)
>
>         at
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>
>         at
> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>
>         at java.lang.Thread.run(Thread.java:745)
>
> Caused by: java.lang.IllegalStateException: TransactionBatch
> TxnIds=[29489...30488] on endPoint =
> {metaStoreUri='thrift://hive.metastore:9083', database='default',
> table='acid', partitionVals=[0804] } has been closed()
>
>         at
> org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.checkIsClosed(HiveEndPoint.java:690)
>
>         at
> org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(HiveEndPoint.java:729)
>
>         at
> org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(HiveEndPoint.java:686)
>
>         at
> org.apache.flume.sink.hive.HiveDelimitedTextSerializer.write(HiveDelimitedTextSerializer.java:48)
>
>         at
> org.apache.flume.sink.hive.HiveWriter$1.call(HiveWriter.java:161)
>
>         at
> org.apache.flume.sink.hive.HiveWriter$1.call(HiveWriter.java:155)
>
>         at
> org.apache.flume.sink.hive.HiveWriter$11.call(HiveWriter.java:425)
>
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>
>         ... 1 more
>
>
>
> My flume.conf file:
>
>
>
> # acidstream - streaming data from Kafka into Hive transactional table
>
> acidstream.sources = kafka-source
>
> acidstream.sinks = hive-sink
>
> acidstream.channels = gutter
>
>
>
> acidstream.sources.kafka-source.channels = gutter
>
> acidstream.sources.kafka-source.type =
> org.apache.flume.source.kafka.KafkaSource
>
> acidstream.sources.kafka-source.zookeeperConnect =
> chdhost125.vitaldev.tma.com.vn:2181,chdhost27.vitaldev.tma.com.vn:2181,
> chdhost185.vitaldev.tma.com.vn:2181
>
> acidstream.sources.kafka-source.topic = lan
>
> acidstream.sources.kafka-source.groupId = acid
>
> acidstream.sources.kafka-source.batchSize = 10000
>
> acidstream.sources.kafka-source.batchDurationMillis = 60000
>
> acidstream.sources.kafka-source.kafka.consumer.timeout.ms = 200
>
>
>
> acidstream.sources.kafka-source.interceptors = i1
>
> acidstream.sources.kafka-source.interceptors.i1.type = regex_extractor
>
> acidstream.sources.kafka-source.interceptors.i1.regex = ^(
> \\d{4}-\\d{2}-\\d{2}\\s\\d{2}:\\d{2}:\\d{2})
>
> acidstream.sources.kafka-source.interceptors.i1.serializers = s1
>
> acidstream.sources.kafka-source.interceptors.i1.serializers.s1.type =
> org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer
>
> acidstream.sources.kafka-source.interceptors.i1.serializers.s1.name =
> timestamp
>
> acidstream.sources.kafka-source.interceptors.i1.serializers.s1.pattern =
> yyyy-MM-dd HH:mm:ss
>
>
>
> acidstream.sinks.hive-sink.channel = gutter
>
> acidstream.sinks.hive-sink.type = hive
>
> acidstream.sinks.hive-sink.hive.metastore = thrift://hive.metastore:9083
>
> acidstream.sinks.hive-sink.hive.database = default
>
> acidstream.sinks.hive-sink.hive.table = acid
>
> acidstream.sinks.hive-sink.hive.partition = %m%d
>
> acidstream.sinks.hive-sink.heartBeatInterval = 10
>
> acidstream.sinks.hive-sink.useLocalTimeStamp = false
>
> acidstream.sinks.hive-sink.round = false
>
> acidstream.sinks.hive-sink.hive.txnsPerBatchAsk = 1000
>
> acidstream.sinks.hive-sink.batchSize = 10000
>
> acidstream.sinks.hive-sink.callTimeout = 30000
>
> acidstream.sinks.hive-sink.serializer = DELIMITED
>
> acidstream.sinks.hive-sink.serializer.delimiter = "\t"
>
> acidstream.sinks.hive-sink.serializer.serdeSeparator = '\t'
>
> acidstream.sinks.hive-sink.serializer.fieldnames = timestamp,id,data
>
>
>
> acidstream.channels.gutter.type = memory
>
> acidstream.channels.gutter.capacity = 100000
>
> acidstream.channels.gutter.transactionCapacity = 50000
>
>
>
> My flume-env file has this line added:
>
>
>
> export JAVA_OPTS="-Xms100m -Xmx3g"
>
>
>
> My table on Hive has the following properties:
>
>
>
> PARTITIONED BY (md string)
>
> CLUSTERED BY (id) INTO 10 BUCKETS
>
> STORED AS ORC
>
> TBLPROPERTIES ('transactional' = 'true');
>
>
>
> Hive has Tez engine set as the default execution engine.
>
>
>
> Could this error be caused by low number of threads? (NameNode has 100
> server threads available)
>
>
>
> Best regards,
>
> Thanh Hong.
>
>
>

RE: Unable to deliver event. org.apache.flume.EventDeliveryException: java.lang.IllegalStateException: TransactionBatch has been closed()

Posted by Thanh Hong Dai <hd...@tma.com.vn>.
I forgot to include the version information. I'm currently using Flume 1.5.2
from HDP 2.4.2.

 

Looking at the changelog of Flume 1.6.0, the latest version, there seems to
be some improvements for Hive support.

This makes me wondering - does Flume 1.5.2 support Hive streaming to ACID
table?

 

Best regards,

Thanh Hong.

 

From: Thanh Hong Dai [mailto:hdthanh@tma.com.vn] 
Sent: Tuesday, 5 July, 2016 11:47 AM
To: user@flume.apache.org
Subject: Unable to deliver event. org.apache.flume.EventDeliveryException:
java.lang.IllegalStateException: TransactionBatch has been closed()

 

Does anyone knows the cause of this exception when using Hive Sink, and how
to fix it?

 

The Hive Sink managed to write data in the Hive table for a few minutes
(which I can confirm by querying the table), but then it shows the Exception
below in the log file (/var/log/flume/flume-<streamname>.log) for all the
nodes.

 

05 Jul 2016 04:24:22,737 ERROR
[SinkRunner-PollingRunner-DefaultSinkProcessor]
(org.apache.flume.SinkRunner$PollingRunner.run:160)  - Unable to deliver
event. Exception follows.

org.apache.flume.EventDeliveryException: java.lang.IllegalStateException:
TransactionBatch TxnIds=[29489...30488] on endPoint =
{metaStoreUri='thrift://hive.metastore:9083', database='default',
table='acid', partitionVals=[0804] } has been closed()

        at org.apache.flume.sink.hive.HiveSink.process(HiveSink.java:375)

        at
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java
:68)

        at
org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)

        at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.IllegalStateException: TransactionBatch
TxnIds=[29489...30488] on endPoint =
{metaStoreUri='thrift://hive.metastore:9083', database='default',
table='acid', partitionVals=[0804] } has been closed()

        at
org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.checkIs
Closed(HiveEndPoint.java:690)

        at
org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(H
iveEndPoint.java:729)

        at
org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(H
iveEndPoint.java:686)

        at
org.apache.flume.sink.hive.HiveDelimitedTextSerializer.write(HiveDelimitedTe
xtSerializer.java:48)

        at org.apache.flume.sink.hive.HiveWriter$1.call(HiveWriter.java:161)

        at org.apache.flume.sink.hive.HiveWriter$1.call(HiveWriter.java:155)

        at
org.apache.flume.sink.hive.HiveWriter$11.call(HiveWriter.java:425)

        at java.util.concurrent.FutureTask.run(FutureTask.java:266)

        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:11
42)

        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:6
17)

        ... 1 more

05 Jul 2016 04:24:27,891 ERROR
[SinkRunner-PollingRunner-DefaultSinkProcessor]
(org.apache.flume.SinkRunner$PollingRunner.run:160)  - Unable to deliver
event. Exception follows.

org.apache.flume.EventDeliveryException: java.lang.IllegalStateException:
TransactionBatch TxnIds=[29489...30488] on endPoint =
{metaStoreUri='thrift://hive.metastore:9083', database='default',
table='acid', partitionVals=[0804] } has been closed()

        at org.apache.flume.sink.hive.HiveSink.process(HiveSink.java:375)

        at
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java
:68)

        at
org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)

        at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.IllegalStateException: TransactionBatch
TxnIds=[29489...30488] on endPoint =
{metaStoreUri='thrift://hive.metastore:9083', database='default',
table='acid', partitionVals=[0804] } has been closed()

        at
org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.checkIs
Closed(HiveEndPoint.java:690)

        at
org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(H
iveEndPoint.java:729)

        at
org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(H
iveEndPoint.java:686)

        at
org.apache.flume.sink.hive.HiveDelimitedTextSerializer.write(HiveDelimitedTe
xtSerializer.java:48)

        at org.apache.flume.sink.hive.HiveWriter$1.call(HiveWriter.java:161)

        at org.apache.flume.sink.hive.HiveWriter$1.call(HiveWriter.java:155)

        at
org.apache.flume.sink.hive.HiveWriter$11.call(HiveWriter.java:425)

        at java.util.concurrent.FutureTask.run(FutureTask.java:266)

        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:11
42)

        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:6
17)

        ... 1 more

 

My flume.conf file:

 

# acidstream - streaming data from Kafka into Hive transactional table

acidstream.sources = kafka-source

acidstream.sinks = hive-sink

acidstream.channels = gutter

 

acidstream.sources.kafka-source.channels = gutter

acidstream.sources.kafka-source.type =
org.apache.flume.source.kafka.KafkaSource

acidstream.sources.kafka-source.zookeeperConnect =
chdhost125.vitaldev.tma.com.vn:2181,chdhost27.vitaldev.tma.com.vn:2181,chdho
st185.vitaldev.tma.com.vn:2181

acidstream.sources.kafka-source.topic = lan

acidstream.sources.kafka-source.groupId = acid

acidstream.sources.kafka-source.batchSize = 10000

acidstream.sources.kafka-source.batchDurationMillis = 60000

acidstream.sources.kafka-source.kafka.consumer.timeout.ms = 200

 

acidstream.sources.kafka-source.interceptors = i1

acidstream.sources.kafka-source.interceptors.i1.type = regex_extractor

acidstream.sources.kafka-source.interceptors.i1.regex =
^(\\d{4}-\\d{2}-\\d{2}\\s\\d{2}:\\d{2}:\\d{2}
<file:///\\d%7b4%7d-\d%7b2%7d-\d%7b2%7d\s\d%7b2%7d:\d%7b2%7d:\d%7b2%7d> )

acidstream.sources.kafka-source.interceptors.i1.serializers = s1

acidstream.sources.kafka-source.interceptors.i1.serializers.s1.type =
org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer

acidstream.sources.kafka-source.interceptors.i1.serializers.s1.name =
timestamp

acidstream.sources.kafka-source.interceptors.i1.serializers.s1.pattern =
yyyy-MM-dd HH:mm:ss

 

acidstream.sinks.hive-sink.channel = gutter

acidstream.sinks.hive-sink.type = hive

acidstream.sinks.hive-sink.hive.metastore = thrift://hive.metastore:9083

acidstream.sinks.hive-sink.hive.database = default

acidstream.sinks.hive-sink.hive.table = acid

acidstream.sinks.hive-sink.hive.partition = %m%d

acidstream.sinks.hive-sink.heartBeatInterval = 10

acidstream.sinks.hive-sink.useLocalTimeStamp = false

acidstream.sinks.hive-sink.round = false

acidstream.sinks.hive-sink.hive.txnsPerBatchAsk = 1000

acidstream.sinks.hive-sink.batchSize = 10000

acidstream.sinks.hive-sink.callTimeout = 30000

acidstream.sinks.hive-sink.serializer = DELIMITED

acidstream.sinks.hive-sink.serializer.delimiter = "\t"

acidstream.sinks.hive-sink.serializer.serdeSeparator = '\t'

acidstream.sinks.hive-sink.serializer.fieldnames = timestamp,id,data

 

acidstream.channels.gutter.type = memory

acidstream.channels.gutter.capacity = 100000

acidstream.channels.gutter.transactionCapacity = 50000

 

My flume-env file has this line added:

 

export JAVA_OPTS="-Xms100m -Xmx3g"

 

My table on Hive has the following properties:

 

PARTITIONED BY (md string)

CLUSTERED BY (id) INTO 10 BUCKETS

STORED AS ORC

TBLPROPERTIES ('transactional' = 'true');

 

Hive has Tez engine set as the default execution engine.

 

Could this error be caused by low number of threads? (NameNode has 100
server threads available)

 

Best regards,

Thanh Hong.