You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@hive.apache.org by "Vinuraj M (JIRA)" <ji...@apache.org> on 2016/08/02 07:10:20 UTC

[jira] [Commented] (HIVE-11906) IllegalStateException: Attempting to flush a RecordUpdater on....bucket_00000 with a single transaction.

    [ https://issues.apache.org/jira/browse/HIVE-11906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15403511#comment-15403511 ] 

Vinuraj M commented on HIVE-11906:
----------------------------------

I am using Streaming ingest API to load files coming in at regular intervals from another system. The way I thought of implementing the file loading into Hive is to get one TransactionBatch instance and write the contents of one file using the single TransactionBatch instance obtained in single transaction. Basically trying to write one file contents in single transaction and commit it so that in case of an error I can always attempt to re-process the whole the file. 

Because of this issue (HIVE-11906) I am forced to split the file contents load into multiple transactions and load. This is making the handling of error scenarios way too complicated than simply re-processing the whole file.

> IllegalStateException: Attempting to flush a RecordUpdater on....bucket_00000 with a single transaction.
> --------------------------------------------------------------------------------------------------------
>
>                 Key: HIVE-11906
>                 URL: https://issues.apache.org/jira/browse/HIVE-11906
>             Project: Hive
>          Issue Type: Bug
>          Components: HCatalog, Transactions
>    Affects Versions: 1.0.0
>            Reporter: Eugene Koifman
>            Assignee: Varadharajan
>
> {noformat}
> java.lang.IllegalStateException: Attempting to flush a RecordUpdater on hdfs://127.0.0.1:9000/user/hive/warehouse/store_sales/dt=2015/delta_0003405_0003405/bucket_00000 with a single transaction.
> 	at org.apache.hadoop.hive.ql.io.orc.OrcRecordUpdater.flush(OrcRecordUpdater.java:341)
> 	at org.apache.hive.hcatalog.streaming.AbstractRecordWriter.flush(AbstractRecordWriter.java:124)
> 	at org.apache.hive.hcatalog.streaming.DelimitedInputWriter.flush(DelimitedInputWriter.java:49)
> 	at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.commitImpl(HiveEndPoint.java:723)
> 	at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.commit(HiveEndPoint.java:701)
> 	at org.apache.hive.acid.RueLaLaTest.test(RueLaLaTest.java:89)
> {noformat}
> {noformat}
> package org.apache.hive.acid;
> import org.apache.commons.logging.Log;
> import org.apache.commons.logging.LogFactory;
> import org.apache.hadoop.hive.common.JavaUtils;
> import org.apache.hadoop.hive.conf.HiveConf;
> import org.apache.hadoop.hive.ql.Driver;
> import org.apache.hadoop.hive.ql.session.SessionState;
> import org.apache.hive.hcatalog.streaming.DelimitedInputWriter;
> import org.apache.hive.hcatalog.streaming.HiveEndPoint;
> import org.apache.hive.hcatalog.streaming.StreamingConnection;
> import org.apache.hive.hcatalog.streaming.TransactionBatch;
> import org.junit.Test;
> import java.net.URL;
> import java.util.ArrayList;
> import java.util.List;
> /**
>  */
> public class RueLaLaTest {
>   static final private Log LOG = LogFactory.getLog(RueLaLaTest.class);
>   @Test
>   public void test() throws Exception {
>     HiveConf.setHiveSiteLocation(new URL("file:///Users/ekoifman/dev/hwxhive/packaging/target/apache-hive-0.14.0-bin/apache-hive-0.14.0-bin/conf/hive-site.xml"));
>     HiveConf hiveConf = new HiveConf(this.getClass());
>     final String workerName = "test_0";
>     SessionState.start(new SessionState(hiveConf));
>     Driver d = new Driver(hiveConf);
>     d.setMaxRows(200002);//make sure Driver returns all results
>     runStatementOnDriver(d, "drop table if exists store_sales");
>     runStatementOnDriver(d, "create table store_sales\n" +
>       "(\n" +
>       "    ss_sold_date_sk           int,\n" +
>       "    ss_sold_time_sk           int,\n" +
>       "    ss_item_sk                int,\n" +
>       "    ss_customer_sk            int,\n" +
>       "    ss_cdemo_sk               int,\n" +
>       "    ss_hdemo_sk               int,\n" +
>       "    ss_addr_sk                int,\n" +
>       "    ss_store_sk               int,\n" +
>       "    ss_promo_sk               int,\n" +
>       "    ss_ticket_number          int,\n" +
>       "    ss_quantity               int,\n" +
>       "    ss_wholesale_cost         decimal(7,2),\n" +
>       "    ss_list_price             decimal(7,2),\n" +
>       "    ss_sales_price            decimal(7,2),\n" +
>       "    ss_ext_discount_amt       decimal(7,2),\n" +
>       "    ss_ext_sales_price        decimal(7,2),\n" +
>       "    ss_ext_wholesale_cost     decimal(7,2),\n" +
>       "    ss_ext_list_price         decimal(7,2),\n" +
>       "    ss_ext_tax                decimal(7,2),\n" +
>       "    ss_coupon_amt             decimal(7,2),\n" +
>       "    ss_net_paid               decimal(7,2),\n" +
>       "    ss_net_paid_inc_tax       decimal(7,2),\n" +
>       "    ss_net_profit             decimal(7,2)\n" +
>       ")\n" +
>       " partitioned by (dt string)\n" +
>       "clustered by (ss_store_sk, ss_promo_sk)\n" +
>       "INTO 2 BUCKETS stored as orc TBLPROPERTIES ('orc.compress'='NONE', 'transactional'='true')");
>     runStatementOnDriver(d, "alter table store_sales add partition(dt='2015')");
>     LOG.info(workerName + " starting...");
>     List<String> partitionVals = new ArrayList<String>();
>     partitionVals.add("2015");
>     HiveEndPoint endPt = new HiveEndPoint(HiveConf.getVar(hiveConf, HiveConf.ConfVars.METASTOREURIS, "thrift://localhost:9933"), "default", "store_sales", partitionVals);
>     DelimitedInputWriter writer = new DelimitedInputWriter(new String[] {"ss_sold_date_sk","ss_sold_time_sk", "ss_item_sk", 
>       "ss_customer_sk", "ss_cdemo_sk", "ss_hdemo_sk", "ss_addr_sk", "ss_store_sk", "ss_promo_sk", "ss_ticket_number", "ss_quantity", 
>       "ss_wholesale_cost", "ss_list_price", "ss_sales_price", "ss_ext_discount_amt", "ss_ext_sales_price", "ss_ext_wholesale_cost", 
>       "ss_ext_list_price", "ss_ext_tax", "ss_coupon_amt", "ss_net_paid", "ss_net_paid_inc_tax", "ss_net_profit"},",", endPt);
>     StreamingConnection connection = endPt.newConnection(false, null);//should this really be null?
>     TransactionBatch txnBatch =  connection.fetchTransactionBatch(1, writer);
>     LOG.info(workerName + " started txn batch");
>     txnBatch.beginNextTransaction();
>     LOG.info(workerName + " started commit txn " + JavaUtils.txnIdToString(txnBatch.getCurrentTxnId()));
>     StringBuilder row = new StringBuilder();
>     for(int i = 0; i < 1; i++) {
>       for(int ints = 0; ints < 11; ints++) {
>         row.append(ints).append(',');
>       }
>       for(int decs = 0; decs < 12; decs++) {
>         row.append(i + 0.1).append(',');
>       }
>       row.setLength(row.length() - 1);
>       txnBatch.write(row.toString().getBytes());
>     }
>     txnBatch.commit();
>     txnBatch.close();
>     connection.close();
>   }
>   private List<String> runStatementOnDriver(Driver d, String stmt) throws Exception {
>     return AcidSystemTest.runStatementOnDriver(d, stmt);
>   }
> }
> {noformat}
> key part being that TransactionBatch has size 1.  > 1 works OK.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)