You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@hive.apache.org by Eugene Koifman <ek...@hortonworks.com> on 2016/08/05 22:19:20 UTC

Re: Hive compaction didn't launch

Support for transactions in Hive is not just for Storm.  You can run transactional SQL statements.  So the system must support cases where all actions with a transaction are not known at the start of the transaction.

From: Igor Kuzmenko <f1...@gmail.com>>
Reply-To: "user@hive.apache.org<ma...@hive.apache.org>" <us...@hive.apache.org>>
Date: Friday, July 29, 2016 at 4:43 AM
To: "user@hive.apache.org<ma...@hive.apache.org>" <us...@hive.apache.org>>
Subject: Re: Hive compaction didn't launch

Here's how storm works right now:

After receiving new message, Storm determine in which partition it should be written. Than, check is there any open connection to that HiveEndPoint<https://github.com/apache/hive/blob/release-1.2.1/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java>, if not - creates one and fetches new transaction batch. Here I assume, that this transactions can only be used to write data at one HiveEndPoint only, because when we fetched transaction batch we pass RecordWriter to fetch method (StreamingConnection::fetchTransactionBatch<https://github.com/apache/hive/blob/release-1.2.1/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StreamingConnection.java>). So I don't see a case "After they write to A they may choose to write to B and then commit". It seems, that Streaming API doesn't support this feature.

Storm keep receiving messages and when his message buffer is full or after fixed period of time it flushes all the messages (performing commit in terms of hive streaming).
And here's interesting part if there's nothing to flush Storm will do nothing. (HiveWriter<https://github.com/apache/storm/blob/v1.0.0/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveWriter.java>)


public void flush(boolean rollToNext)
        throws CommitFailure, TxnBatchFailure, TxnFailure, InterruptedException {
    // if there are no records do not call flush
    if (totalRecords <= 0) return;
    try {
        synchronized(txnBatchLock) {
            commitTxn();
            nextTxn(rollToNext);
            totalRecords = 0;
            lastUsed = System.currentTimeMillis();
        }
    } catch(StreamingException e) {
        throw new TxnFailure(txnBatch, e);
    }
}

At the same time Storm maintain all fetched transactions in separate thread by sending heartbeats (HiveBolt<https://github.com/apache/storm/blob/v1.0.0/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java>):

private void setupHeartBeatTimer() {
    if(options.getHeartBeatInterval()>0) {
        heartBeatTimer.schedule(new TimerTask() {
            @Override
            public void run() {
                try {
                    if (sendHeartBeat.get()) {
                        LOG.debug("Start sending heartbeat on all writers");
                        sendHeartBeatOnAllWriters();
                        setupHeartBeatTimer();
                    }
                } catch (Exception e) {
                    LOG.warn("Failed to heartbeat on HiveWriter ", e);
                }
            }
        }, options.getHeartBeatInterval() * 1000);
    }
}

The only way idle connection will be closed is excess connections limit which is configurable parameter, but I can't control this event explicitly. Making transaction batch smaller doesn't help either. Even if batch size is 1, after flushing the data Storm will get another transaction batch and will wait new messages, which may not come for a long time.

I don't see any way to fix this problem with proper configuration, I need to make changes in Hive or Storm code. Question is where it more appropriate?




On Fri, Jul 29, 2016 at 8:15 AM, Eugene Koifman <ek...@hortonworks.com>> wrote:
I think Storm has some timeout parameter that will close the transaction
if there are no events for a certain amount of time.
How many transactions do you per transaction batch?  Perhaps making the
batches smaller will make them close sooner.

Eugene


On 7/28/16, 3:59 PM, "Alan Gates" <al...@gmail.com>> wrote:

>But until those transactions are closed you don¹t know that they won¹t
>write to partition B.  After they write to A they may choose to write to
>B and then commit.  The compactor can not make any assumptions about what
>sessions with open transactions will do in the future.
>
>Alan.
>
>> On Jul 28, 2016, at 09:19, Igor Kuzmenko <f1...@gmail.com>> wrote:
>>
>> But this minOpenTxn value isn't from from delta I want to compact.
>>minOpenTxn can point on transaction in partition A while in partition B
>>there's deltas ready for compaction. If minOpenTxn is less than txnIds
>>in partition B deltas, compaction won't happen. So open transaction in
>>partition A blocks compaction in partition B. That's seems wrong to me.
>>
>> On Thu, Jul 28, 2016 at 7:06 PM, Alan Gates <al...@gmail.com>>
>>wrote:
>> Hive is doing the right thing there, as it cannot compact the deltas
>>into a base file while there are still open transactions in the delta.
>>Storm should be committing on some frequency even if it doesn¹t have
>>enough data to commit.
>>
>> Alan.
>>
>> > On Jul 28, 2016, at 05:36, Igor Kuzmenko <f1...@gmail.com>> wrote:
>> >
>> > I made some research on that issue.
>> > The problem is in ValidCompactorTxnList::isTxnRangeValid method.
>> >
>> > Here's code:
>> > @Override
>> > public RangeResponse isTxnRangeValid(long minTxnId, long maxTxnId) {
>> >   if (highWatermark < minTxnId) {
>> >     return RangeResponse.NONE;
>> >   } else if (minOpenTxn < 0) {
>> >     return highWatermark >= maxTxnId ? RangeResponse.ALL :
>>RangeResponse.NONE;
>> >   } else {
>> >     return minOpenTxn > maxTxnId ? RangeResponse.ALL :
>>RangeResponse.NONE;
>> >   }
>> > }
>> >
>> > In my case this method returned RangeResponce.NONE for most of delta
>>files. With this value delta file doesn't include in compaction.
>> >
>> > Last 'else' bock compare minOpenTxn to maxTxnId and if maxTxnId
>>bigger return RangeResponce.NONE, thats a problem for me, because of
>>using Storm Hive Bolt. Hive Bolt gets transaction and maintain it open
>>with heartbeat until there's data to commit.
>> >
>> > So if i get transaction and maintain it open all compactions will
>>stop. Is it incorrect Hive behavior, or Storm should close transaction?
>> >
>> >
>> >
>> >
>> > On Wed, Jul 27, 2016 at 8:46 PM, Igor Kuzmenko <f1...@gmail.com>>
>>wrote:
>> > Thanks for reply, Alan. My guess with Storm was wrong. Today I get
>>same behavior with running Storm topology.
>> > Anyway, I'd like to know, how can I check that transaction batch was
>>closed correctly?
>> >
>> > On Wed, Jul 27, 2016 at 8:09 PM, Alan Gates <al...@gmail.com>>
>>wrote:
>> > I don¹t know the details of how the storm application that streams
>>into Hive works, but this sounds like the transaction batches weren¹t
>>getting closed.  Compaction can¹t happen until those batches are closed.
>> Do you know how you had storm configured?  Also, you might ask
>>separately on the storm list to see if people have seen this issue
>>before.
>> >
>> > Alan.
>> >
>> > > On Jul 27, 2016, at 03:31, Igor Kuzmenko <f1...@gmail.com>> wrote:
>> > >
>> > > One more thing. I'm using Apache Storm to stream data in Hive. And
>>when I turned off Storm topology compactions started to work properly.
>> > >
>> > > On Tue, Jul 26, 2016 at 6:28 PM, Igor Kuzmenko <f1...@gmail.com>>
>>wrote:
>> > > I'm using Hive 1.2.1 transactional table. Inserting data in it via
>>Hive Streaming API. After some time i expect compaction to start but it
>>didn't happen:
>> > >
>> > > Here's part of log, which shows that compactor initiator thread
>>doesn't see any delta files:
>> > > 2016-07-26 18:06:52,459 INFO  [Thread-8]: compactor.Initiator
>>(Initiator.java:run(89)) - Checking to see if we should compact
>>default.data_aaa.dt=20160726
>> > > 2016-07-26 18:06:52,496 DEBUG [Thread-8]: io.AcidUtils
>>(AcidUtils.java:getAcidState(432)) - in directory
>>hdfs://sorm-master01.msk.mts.ru:8020/apps/hive/warehouse/data_aaa/dt=2016<http://sorm-master01.msk.mts.ru:8020/apps/hive/warehouse/data_aaa/dt=2016>
>>0726 base = null deltas = 0
>> > > 2016-07-26 18:06:52,496 DEBUG [Thread-8]: compactor.Initiator
>>(Initiator.java:determineCompactionType(271)) - delta size: 0 base size:
>>0 threshold: 0.1 will major compact: false
>> > >
>> > > But in that directory there's actually 23 files:
>> > >
>> > > hadoop fs -ls /apps/hive/warehouse/data_aaa/dt=20160726
>> > > Found 23 items
>> > > -rw-r--r--   3 storm hdfs          4 2016-07-26 17:20
>>/apps/hive/warehouse/data_aaa/dt=20160726/_orc_acid_version
>> > > drwxrwxrwx   - storm hdfs          0 2016-07-26 17:22
>>/apps/hive/warehouse/data_aaa/dt=20160726/delta_71741256_71741355
>> > > drwxrwxrwx   - storm hdfs          0 2016-07-26 17:23
>>/apps/hive/warehouse/data_aaa/dt=20160726/delta_71762456_71762555
>> > > drwxrwxrwx   - storm hdfs          0 2016-07-26 17:25
>>/apps/hive/warehouse/data_aaa/dt=20160726/delta_71787756_71787855
>> > > drwxrwxrwx   - storm hdfs          0 2016-07-26 17:26
>>/apps/hive/warehouse/data_aaa/dt=20160726/delta_71795756_71795855
>> > > drwxrwxrwx   - storm hdfs          0 2016-07-26 17:27
>>/apps/hive/warehouse/data_aaa/dt=20160726/delta_71804656_71804755
>> > > drwxrwxrwx   - storm hdfs          0 2016-07-26 17:29
>>/apps/hive/warehouse/data_aaa/dt=20160726/delta_71828856_71828955
>> > > drwxrwxrwx   - storm hdfs          0 2016-07-26 17:30
>>/apps/hive/warehouse/data_aaa/dt=20160726/delta_71846656_71846755
>> > > drwxrwxrwx   - storm hdfs          0 2016-07-26 17:32
>>/apps/hive/warehouse/data_aaa/dt=20160726/delta_71850756_71850855
>> > > drwxrwxrwx   - storm hdfs          0 2016-07-26 17:33
>>/apps/hive/warehouse/data_aaa/dt=20160726/delta_71867356_71867455
>> > > drwxrwxrwx   - storm hdfs          0 2016-07-26 17:34
>>/apps/hive/warehouse/data_aaa/dt=20160726/delta_71891556_71891655
>> > > drwxrwxrwx   - storm hdfs          0 2016-07-26 17:36
>>/apps/hive/warehouse/data_aaa/dt=20160726/delta_71904856_71904955
>> > > drwxrwxrwx   - storm hdfs          0 2016-07-26 17:37
>>/apps/hive/warehouse/data_aaa/dt=20160726/delta_71907256_71907355
>> > > drwxrwxrwx   - storm hdfs          0 2016-07-26 17:39
>>/apps/hive/warehouse/data_aaa/dt=20160726/delta_71918756_71918855
>> > > drwxrwxrwx   - storm hdfs          0 2016-07-26 17:40
>>/apps/hive/warehouse/data_aaa/dt=20160726/delta_71947556_71947655
>> > > drwxrwxrwx   - storm hdfs          0 2016-07-26 17:41
>>/apps/hive/warehouse/data_aaa/dt=20160726/delta_71960656_71960755
>> > > drwxrwxrwx   - storm hdfs          0 2016-07-26 17:43
>>/apps/hive/warehouse/data_aaa/dt=20160726/delta_71963156_71963255
>> > > drwxrwxrwx   - storm hdfs          0 2016-07-26 17:44
>>/apps/hive/warehouse/data_aaa/dt=20160726/delta_71964556_71964655
>> > > drwxrwxrwx   - storm hdfs          0 2016-07-26 17:46
>>/apps/hive/warehouse/data_aaa/dt=20160726/delta_71987156_71987255
>> > > drwxrwxrwx   - storm hdfs          0 2016-07-26 17:47
>>/apps/hive/warehouse/data_aaa/dt=20160726/delta_72015756_72015855
>> > > drwxrwxrwx   - storm hdfs          0 2016-07-26 17:48
>>/apps/hive/warehouse/data_aaa/dt=20160726/delta_72021356_72021455
>> > > drwxrwxrwx   - storm hdfs          0 2016-07-26 17:50
>>/apps/hive/warehouse/data_aaa/dt=20160726/delta_72048756_72048855
>> > > drwxrwxrwx   - storm hdfs          0 2016-07-26 17:50
>>/apps/hive/warehouse/data_aaa/dt=20160726/delta_72070856_72070955
>> > >
>> > > Full log here.
>> > >
>> > > What could go wrong?
>> > >
>> >
>> >
>> >
>>
>>
>
>