You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2018/04/16 06:35:52 UTC
[1/4] hive git commit: HIVE-19210: Create separate module for
streaming ingest (Prasanth Jayachandran reviewed by Eugene Koifman)
Repository: hive
Updated Branches:
refs/heads/master 16d94fbe2 -> 6bd32a0d6
http://git-wip-us.apache.org/repos/asf/hive/blob/6bd32a0d/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
new file mode 100644
index 0000000..6f63bfb
--- /dev/null
+++ b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
@@ -0,0 +1,2330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.BUCKET_COUNT;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.Validator;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
+import org.apache.hadoop.hive.metastore.api.LockState;
+import org.apache.hadoop.hive.metastore.api.LockType;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
+import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
+import org.apache.hadoop.hive.metastore.api.TxnInfo;
+import org.apache.hadoop.hive.metastore.api.TxnState;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.metastore.txn.AcidHouseKeeperService;
+import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.hadoop.hive.ql.DriverFactory;
+import org.apache.hadoop.hive.ql.IDriver;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.BucketCodec;
+import org.apache.hadoop.hive.ql.io.IOConstants;
+import org.apache.hadoop.hive.ql.io.orc.OrcFile;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
+import org.apache.hadoop.hive.ql.io.orc.Reader;
+import org.apache.hadoop.hive.ql.io.orc.RecordReader;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.txn.compactor.Worker;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableLongObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector;
+import org.apache.hadoop.hive.shims.Utils;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.orc.impl.OrcAcidUtils;
+import org.apache.orc.tools.FileDump;
+import org.apache.thrift.TException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class TestStreaming {
+ private static final Logger LOG = LoggerFactory.getLogger(TestStreaming.class);
+
+ public static class RawFileSystem extends RawLocalFileSystem {
+ private static final URI NAME;
+ static {
+ try {
+ NAME = new URI("raw:///");
+ } catch (URISyntaxException se) {
+ throw new IllegalArgumentException("bad uri", se);
+ }
+ }
+
+ @Override
+ public URI getUri() {
+ return NAME;
+ }
+
+
+ @Override
+ public FileStatus getFileStatus(Path path) throws IOException {
+ File file = pathToFile(path);
+ if (!file.exists()) {
+ throw new FileNotFoundException("Can't find " + path);
+ }
+ // get close enough
+ short mod = 0;
+ if (file.canRead()) {
+ mod |= 0444;
+ }
+ if (file.canWrite()) {
+ mod |= 0200;
+ }
+ if (file.canExecute()) {
+ mod |= 0111;
+ }
+ return new FileStatus(file.length(), file.isDirectory(), 1, 1024,
+ file.lastModified(), file.lastModified(),
+ FsPermission.createImmutable(mod), "owen", "users", path);
+ }
+ }
+
+ private static final String COL1 = "id";
+ private static final String COL2 = "msg";
+
+ private final HiveConf conf;
+ private IDriver driver;
+ private final IMetaStoreClient msClient;
+
+ final String metaStoreURI = null;
+
+ // partitioned table
+ private final static String dbName = "testing";
+ private final static String tblName = "alerts";
+ private final static String[] fieldNames = new String[]{COL1,COL2};
+ List<String> partitionVals;
+ private static Path partLoc;
+ private static Path partLoc2;
+
+ // unpartitioned table
+ private final static String dbName2 = "testing2";
+ private final static String tblName2 = "alerts";
+ private final static String[] fieldNames2 = new String[]{COL1,COL2};
+
+
+ // for bucket join testing
+ private final static String dbName3 = "testing3";
+ private final static String tblName3 = "dimensionTable";
+ private final static String dbName4 = "testing4";
+ private final static String tblName4 = "factTable";
+ List<String> partitionVals2;
+
+
+ private final String PART1_CONTINENT = "Asia";
+ private final String PART1_COUNTRY = "India";
+
+ @Rule
+ public TemporaryFolder dbFolder = new TemporaryFolder();
+
+
+ public TestStreaming() throws Exception {
+ partitionVals = new ArrayList<String>(2);
+ partitionVals.add(PART1_CONTINENT);
+ partitionVals.add(PART1_COUNTRY);
+
+ partitionVals2 = new ArrayList<String>(1);
+ partitionVals2.add(PART1_COUNTRY);
+
+
+ conf = new HiveConf(this.getClass());
+ conf.set("fs.raw.impl", RawFileSystem.class.getName());
+ conf
+ .setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
+ "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
+ TxnDbUtil.setConfValues(conf);
+ if (metaStoreURI!=null) {
+ conf.setVar(HiveConf.ConfVars.METASTOREURIS, metaStoreURI);
+ }
+ conf.setBoolVar(HiveConf.ConfVars.METASTORE_EXECUTE_SET_UGI, true);
+ conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, true);
+ dbFolder.create();
+
+
+ //1) Start from a clean slate (metastore)
+ TxnDbUtil.cleanDb(conf);
+ TxnDbUtil.prepDb(conf);
+
+ //2) obtain metastore clients
+ msClient = new HiveMetaStoreClient(conf);
+ }
+
+ @Before
+ public void setup() throws Exception {
+ SessionState.start(new CliSessionState(conf));
+ driver = DriverFactory.newDriver(conf);
+ driver.setMaxRows(200002);//make sure Driver returns all results
+ // drop and recreate the necessary databases and tables
+ dropDB(msClient, dbName);
+
+ String[] colNames = new String[] {COL1, COL2};
+ String[] colTypes = new String[] {serdeConstants.INT_TYPE_NAME, serdeConstants.STRING_TYPE_NAME};
+ String[] bucketCols = new String[] {COL1};
+ String loc1 = dbFolder.newFolder(dbName + ".db").toString();
+ String[] partNames = new String[]{"Continent", "Country"};
+ partLoc = createDbAndTable(driver, dbName, tblName, partitionVals, colNames, colTypes, bucketCols, partNames, loc1, 1);
+
+ dropDB(msClient, dbName2);
+ String loc2 = dbFolder.newFolder(dbName2 + ".db").toString();
+ partLoc2 = createDbAndTable(driver, dbName2, tblName2, null, colNames, colTypes, bucketCols, null, loc2, 2);
+
+ String loc3 = dbFolder.newFolder("testing5.db").toString();
+ createStoreSales("testing5", loc3);
+
+ runDDL(driver, "drop table testBucketing3.streamedtable");
+ runDDL(driver, "drop table testBucketing3.finaltable");
+ runDDL(driver, "drop table testBucketing3.nobucket");
+ }
+
+ @After
+ public void cleanup() throws Exception {
+ msClient.close();
+ driver.close();
+ }
+
+ private static List<FieldSchema> getPartitionKeys() {
+ List<FieldSchema> fields = new ArrayList<FieldSchema>();
+ // Defining partition names in unsorted order
+ fields.add(new FieldSchema("continent", serdeConstants.STRING_TYPE_NAME, ""));
+ fields.add(new FieldSchema("country", serdeConstants.STRING_TYPE_NAME, ""));
+ return fields;
+ }
+
+ private void createStoreSales(String dbName, String loc) throws Exception {
+ String dbUri = "raw://" + new Path(loc).toUri().toString();
+ String tableLoc = dbUri + Path.SEPARATOR + "store_sales";
+
+ boolean success = runDDL(driver, "create database IF NOT EXISTS " + dbName + " location '" + dbUri + "'");
+ Assert.assertTrue(success);
+ success = runDDL(driver, "use " + dbName);
+ Assert.assertTrue(success);
+
+ success = runDDL(driver, "drop table if exists store_sales");
+ Assert.assertTrue(success);
+ success = runDDL(driver, "create table store_sales\n" +
+ "(\n" +
+ " ss_sold_date_sk int,\n" +
+ " ss_sold_time_sk int,\n" +
+ " ss_item_sk int,\n" +
+ " ss_customer_sk int,\n" +
+ " ss_cdemo_sk int,\n" +
+ " ss_hdemo_sk int,\n" +
+ " ss_addr_sk int,\n" +
+ " ss_store_sk int,\n" +
+ " ss_promo_sk int,\n" +
+ " ss_ticket_number int,\n" +
+ " ss_quantity int,\n" +
+ " ss_wholesale_cost decimal(7,2),\n" +
+ " ss_list_price decimal(7,2),\n" +
+ " ss_sales_price decimal(7,2),\n" +
+ " ss_ext_discount_amt decimal(7,2),\n" +
+ " ss_ext_sales_price decimal(7,2),\n" +
+ " ss_ext_wholesale_cost decimal(7,2),\n" +
+ " ss_ext_list_price decimal(7,2),\n" +
+ " ss_ext_tax decimal(7,2),\n" +
+ " ss_coupon_amt decimal(7,2),\n" +
+ " ss_net_paid decimal(7,2),\n" +
+ " ss_net_paid_inc_tax decimal(7,2),\n" +
+ " ss_net_profit decimal(7,2)\n" +
+ ")\n" +
+ " partitioned by (dt string)\n" +
+ "clustered by (ss_store_sk, ss_promo_sk)\n" +
+ "INTO 4 BUCKETS stored as orc " + " location '" + tableLoc + "'" + " TBLPROPERTIES ('orc.compress'='NONE', 'transactional'='true')");
+ Assert.assertTrue(success);
+
+ success = runDDL(driver, "alter table store_sales add partition(dt='2015')");
+ Assert.assertTrue(success);
+ }
+ /**
+ * make sure it works with table where bucket col is not 1st col
+ * @throws Exception
+ */
+ @Test
+ public void testBucketingWhereBucketColIsNotFirstCol() throws Exception {
+ List<String> partitionVals = new ArrayList<String>();
+ partitionVals.add("2015");
+ HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "testing5", "store_sales", partitionVals);
+ StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName());
+ DelimitedInputWriter writer = new DelimitedInputWriter(new String[] {"ss_sold_date_sk","ss_sold_time_sk", "ss_item_sk",
+ "ss_customer_sk", "ss_cdemo_sk", "ss_hdemo_sk", "ss_addr_sk", "ss_store_sk", "ss_promo_sk", "ss_ticket_number", "ss_quantity",
+ "ss_wholesale_cost", "ss_list_price", "ss_sales_price", "ss_ext_discount_amt", "ss_ext_sales_price", "ss_ext_wholesale_cost",
+ "ss_ext_list_price", "ss_ext_tax", "ss_coupon_amt", "ss_net_paid", "ss_net_paid_inc_tax", "ss_net_profit"},",", endPt, connection);
+
+ TransactionBatch txnBatch = connection.fetchTransactionBatch(2, writer);
+ txnBatch.beginNextTransaction();
+
+ StringBuilder row = new StringBuilder();
+ for(int i = 0; i < 10; i++) {
+ for(int ints = 0; ints < 11; ints++) {
+ row.append(ints).append(',');
+ }
+ for(int decs = 0; decs < 12; decs++) {
+ row.append(i + 0.1).append(',');
+ }
+ row.setLength(row.length() - 1);
+ txnBatch.write(row.toString().getBytes());
+ }
+ txnBatch.commit();
+ txnBatch.close();
+ connection.close();
+
+ ArrayList<String> res = queryTable(driver, "select row__id.bucketid, * from testing5.store_sales");
+ for (String re : res) {
+ System.out.println(re);
+ }
+ }
+
+ /**
+ * Test that streaming can write to unbucketed table.
+ */
+ @Test
+ public void testNoBuckets() throws Exception {
+ queryTable(driver, "drop table if exists default.streamingnobuckets");
+ //todo: why does it need transactional_properties?
+ queryTable(driver, "create table default.streamingnobuckets (a string, b string) stored as orc TBLPROPERTIES('transactional'='true', 'transactional_properties'='default')");
+ queryTable(driver, "insert into default.streamingnobuckets values('foo','bar')");
+ List<String> rs = queryTable(driver, "select * from default.streamingnobuckets");
+ Assert.assertEquals(1, rs.size());
+ Assert.assertEquals("foo\tbar", rs.get(0));
+ HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "default", "streamingnobuckets", null);
+ String[] colNames1 = new String[] { "a", "b" };
+ StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName());
+ DelimitedInputWriter wr = new DelimitedInputWriter(colNames1,",", endPt, connection);
+
+ TransactionBatch txnBatch = connection.fetchTransactionBatch(2, wr);
+ txnBatch.beginNextTransaction();
+ txnBatch.write("a1,b2".getBytes());
+ txnBatch.write("a3,b4".getBytes());
+ txnBatch.commit();
+ txnBatch.beginNextTransaction();
+ txnBatch.write("a5,b6".getBytes());
+ txnBatch.write("a7,b8".getBytes());
+ txnBatch.commit();
+ txnBatch.close();
+
+ Assert.assertEquals("", 0, BucketCodec.determineVersion(536870912).decodeWriterId(536870912));
+ rs = queryTable(driver,"select ROW__ID, a, b, INPUT__FILE__NAME from default.streamingnobuckets order by ROW__ID");
+
+ Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\tfoo\tbar"));
+ Assert.assertTrue(rs.get(0), rs.get(0).endsWith("streamingnobuckets/delta_0000001_0000001_0000/bucket_00000"));
+ Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\ta1\tb2"));
+ Assert.assertTrue(rs.get(1), rs.get(1).endsWith("streamingnobuckets/delta_0000002_0000003/bucket_00000"));
+ Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\ta3\tb4"));
+ Assert.assertTrue(rs.get(2), rs.get(2).endsWith("streamingnobuckets/delta_0000002_0000003/bucket_00000"));
+ Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\ta5\tb6"));
+ Assert.assertTrue(rs.get(3), rs.get(3).endsWith("streamingnobuckets/delta_0000002_0000003/bucket_00000"));
+ Assert.assertTrue(rs.get(4), rs.get(4).startsWith("{\"writeid\":3,\"bucketid\":536870912,\"rowid\":1}\ta7\tb8"));
+ Assert.assertTrue(rs.get(4), rs.get(4).endsWith("streamingnobuckets/delta_0000002_0000003/bucket_00000"));
+
+ queryTable(driver, "update default.streamingnobuckets set a=0, b=0 where a='a7'");
+ queryTable(driver, "delete from default.streamingnobuckets where a='a1'");
+ rs = queryTable(driver, "select a, b from default.streamingnobuckets order by a, b");
+ int row = 0;
+ Assert.assertEquals("at row=" + row, "0\t0", rs.get(row++));
+ Assert.assertEquals("at row=" + row, "a3\tb4", rs.get(row++));
+ Assert.assertEquals("at row=" + row, "a5\tb6", rs.get(row++));
+ Assert.assertEquals("at row=" + row, "foo\tbar", rs.get(row++));
+
+ queryTable(driver, "alter table default.streamingnobuckets compact 'major'");
+ runWorker(conf);
+ rs = queryTable(driver,"select ROW__ID, a, b, INPUT__FILE__NAME from default.streamingnobuckets order by ROW__ID");
+
+ Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\tfoo\tbar"));
+ Assert.assertTrue(rs.get(0), rs.get(0).endsWith("streamingnobuckets/base_0000005/bucket_00000"));
+ Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\ta3\tb4"));
+ Assert.assertTrue(rs.get(1), rs.get(1).endsWith("streamingnobuckets/base_0000005/bucket_00000"));
+ Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\ta5\tb6"));
+ Assert.assertTrue(rs.get(2), rs.get(2).endsWith("streamingnobuckets/base_0000005/bucket_00000"));
+ Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t0\t0"));
+ Assert.assertTrue(rs.get(3), rs.get(3).endsWith("streamingnobuckets/base_0000005/bucket_00000"));
+ }
+
+ /**
+ * this is a clone from TestTxnStatement2....
+ */
+ public static void runWorker(HiveConf hiveConf) throws MetaException {
+ AtomicBoolean stop = new AtomicBoolean(true);
+ Worker t = new Worker();
+ t.setThreadId((int) t.getId());
+ t.setConf(hiveConf);
+ AtomicBoolean looped = new AtomicBoolean();
+ t.init(stop, looped);
+ t.run();
+ }
+
+ // stream data into streaming table with N buckets, then copy the data into another bucketed table
+ // check if bucketing in both was done in the same way
+ @Test
+ public void testStreamBucketingMatchesRegularBucketing() throws Exception {
+ int bucketCount = 100;
+
+ String dbUri = "raw://" + new Path(dbFolder.newFolder().toString()).toUri().toString();
+ String tableLoc = "'" + dbUri + Path.SEPARATOR + "streamedtable" + "'";
+ String tableLoc2 = "'" + dbUri + Path.SEPARATOR + "finaltable" + "'";
+ String tableLoc3 = "'" + dbUri + Path.SEPARATOR + "nobucket" + "'";
+
+ runDDL(driver, "create database testBucketing3");
+ runDDL(driver, "use testBucketing3");
+ runDDL(driver, "create table streamedtable ( key1 string,key2 int,data string ) clustered by ( key1,key2 ) into "
+ + bucketCount + " buckets stored as orc location " + tableLoc + " TBLPROPERTIES ('transactional'='true')") ;
+// In 'nobucket' table we capture bucketid from streamedtable to workaround a hive bug that prevents joins two identically bucketed tables
+ runDDL(driver, "create table nobucket ( bucketid int, key1 string,key2 int,data string ) location " + tableLoc3) ;
+ runDDL(driver, "create table finaltable ( bucketid int, key1 string,key2 int,data string ) clustered by ( key1,key2 ) into "
+ + bucketCount + " buckets stored as orc location " + tableLoc2 + " TBLPROPERTIES ('transactional'='true')");
+
+
+ String[] records = new String[] {
+ "PSFAHYLZVC,29,EPNMA",
+ "PPPRKWAYAU,96,VUTEE",
+ "MIAOFERCHI,3,WBDSI",
+ "CEGQAZOWVN,0,WCUZL",
+ "XWAKMNSVQF,28,YJVHU",
+ "XBWTSAJWME,2,KDQFO",
+ "FUVLQTAXAY,5,LDSDG",
+ "QTQMDJMGJH,6,QBOMA",
+ "EFLOTLWJWN,71,GHWPS",
+ "PEQNAOJHCM,82,CAAFI",
+ "MOEKQLGZCP,41,RUACR",
+ "QZXMCOPTID,37,LFLWE",
+ "EYALVWICRD,13,JEZLC",
+ "VYWLZAYTXX,16,DMVZX",
+ "OSALYSQIXR,47,HNZVE",
+ "JGKVHKCEGQ,25,KSCJB",
+ "WQFMMYDHET,12,DTRWA",
+ "AJOVAYZKZQ,15,YBKFO",
+ "YAQONWCUAU,31,QJNHZ",
+ "DJBXUEUOEB,35,IYCBL"
+ };
+
+ HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "testBucketing3", "streamedtable", null);
+ String[] colNames1 = new String[] { "key1", "key2", "data" };
+ DelimitedInputWriter wr = new DelimitedInputWriter(colNames1,",", endPt);
+ StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName());
+
+ TransactionBatch txnBatch = connection.fetchTransactionBatch(2, wr);
+ txnBatch.beginNextTransaction();
+
+ for (String record : records) {
+ txnBatch.write(record.toString().getBytes());
+ }
+
+ txnBatch.commit();
+ txnBatch.close();
+ connection.close();
+
+ ArrayList<String> res1 = queryTable(driver, "select row__id.bucketid, * from streamedtable order by key2");
+ for (String re : res1) {
+ System.out.println(re);
+ }
+
+ driver.run("insert into nobucket select row__id.bucketid,* from streamedtable");
+ runDDL(driver, " insert into finaltable select * from nobucket");
+ ArrayList<String> res2 = queryTable(driver, "select row__id.bucketid,* from finaltable where row__id.bucketid<>bucketid");
+ for (String s : res2) {
+ LOG.error(s);
+ }
+ Assert.assertTrue(res2.isEmpty());
+ }
+
+
+ @Test
+ public void testTableValidation() throws Exception {
+ int bucketCount = 100;
+
+ String dbUri = "raw://" + new Path(dbFolder.newFolder().toString()).toUri().toString();
+ String tbl1 = "validation1";
+ String tbl2 = "validation2";
+
+ String tableLoc = "'" + dbUri + Path.SEPARATOR + tbl1 + "'";
+ String tableLoc2 = "'" + dbUri + Path.SEPARATOR + tbl2 + "'";
+
+ runDDL(driver, "create database testBucketing3");
+ runDDL(driver, "use testBucketing3");
+
+ runDDL(driver, "create table " + tbl1 + " ( key1 string, data string ) clustered by ( key1 ) into "
+ + bucketCount + " buckets stored as orc location " + tableLoc + " TBLPROPERTIES ('transactional'='false')") ;
+
+ runDDL(driver, "create table " + tbl2 + " ( key1 string, data string ) clustered by ( key1 ) into "
+ + bucketCount + " buckets stored as orc location " + tableLoc2 + " TBLPROPERTIES ('transactional'='false')") ;
+
+
+ try {
+ HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "testBucketing3", "validation1", null);
+ endPt.newConnection(false, "UT_" + Thread.currentThread().getName());
+ Assert.assertTrue("InvalidTable exception was not thrown", false);
+ } catch (InvalidTable e) {
+ // expecting this exception
+ }
+ try {
+ HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "testBucketing3", "validation2", null);
+ endPt.newConnection(false, "UT_" + Thread.currentThread().getName());
+ Assert.assertTrue("InvalidTable exception was not thrown", false);
+ } catch (InvalidTable e) {
+ // expecting this exception
+ }
+ }
+
+ /**
+ * @deprecated use {@link #checkDataWritten2(Path, long, long, int, String, boolean, String...)} -
+ * there is little value in using InputFormat directly
+ */
+ @Deprecated
+ private void checkDataWritten(Path partitionPath, long minTxn, long maxTxn, int buckets, int numExpectedFiles,
+ String... records) throws Exception {
+ ValidWriteIdList writeIds = msClient.getValidWriteIds(AcidUtils.getFullTableName(dbName, tblName));
+ AcidUtils.Directory dir = AcidUtils.getAcidState(partitionPath, conf, writeIds);
+ Assert.assertEquals(0, dir.getObsolete().size());
+ Assert.assertEquals(0, dir.getOriginalFiles().size());
+ List<AcidUtils.ParsedDelta> current = dir.getCurrentDirectories();
+ System.out.println("Files found: ");
+ for (AcidUtils.ParsedDelta pd : current) {
+ System.out.println(pd.getPath().toString());
+ }
+ Assert.assertEquals(numExpectedFiles, current.size());
+
+ // find the absolute minimum transaction
+ long min = Long.MAX_VALUE;
+ long max = Long.MIN_VALUE;
+ for (AcidUtils.ParsedDelta pd : current) {
+ if (pd.getMaxWriteId() > max) {
+ max = pd.getMaxWriteId();
+ }
+ if (pd.getMinWriteId() < min) {
+ min = pd.getMinWriteId();
+ }
+ }
+ Assert.assertEquals(minTxn, min);
+ Assert.assertEquals(maxTxn, max);
+
+ InputFormat inf = new OrcInputFormat();
+ JobConf job = new JobConf();
+ job.set("mapred.input.dir", partitionPath.toString());
+ job.set(BUCKET_COUNT, Integer.toString(buckets));
+ job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, "id,msg");
+ job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, "bigint:string");
+ AcidUtils.setAcidOperationalProperties(job, true, null);
+ job.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true);
+ job.set(ValidWriteIdList.VALID_WRITEIDS_KEY, writeIds.toString());
+ InputSplit[] splits = inf.getSplits(job, buckets);
+ Assert.assertEquals(numExpectedFiles, splits.length);
+ org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct> rr =
+ inf.getRecordReader(splits[0], job, Reporter.NULL);
+
+ NullWritable key = rr.createKey();
+ OrcStruct value = rr.createValue();
+ for (String record : records) {
+ Assert.assertEquals(true, rr.next(key, value));
+ Assert.assertEquals(record, value.toString());
+ }
+ Assert.assertEquals(false, rr.next(key, value));
+ }
+ /**
+ * @param validationQuery query to read from table to compare data against {@code records}
+ * @param records expected data. each row is CVS list of values
+ */
+ private void checkDataWritten2(Path partitionPath, long minTxn, long maxTxn, int numExpectedFiles,
+ String validationQuery, boolean vectorize, String... records) throws Exception {
+ ValidWriteIdList txns = msClient.getValidWriteIds(AcidUtils.getFullTableName(dbName, tblName));
+ AcidUtils.Directory dir = AcidUtils.getAcidState(partitionPath, conf, txns);
+ Assert.assertEquals(0, dir.getObsolete().size());
+ Assert.assertEquals(0, dir.getOriginalFiles().size());
+ List<AcidUtils.ParsedDelta> current = dir.getCurrentDirectories();
+ System.out.println("Files found: ");
+ for (AcidUtils.ParsedDelta pd : current) {
+ System.out.println(pd.getPath().toString());
+ }
+ Assert.assertEquals(numExpectedFiles, current.size());
+
+ // find the absolute minimum transaction
+ long min = Long.MAX_VALUE;
+ long max = Long.MIN_VALUE;
+ for (AcidUtils.ParsedDelta pd : current) {
+ if (pd.getMaxWriteId() > max) {
+ max = pd.getMaxWriteId();
+ }
+ if (pd.getMinWriteId() < min) {
+ min = pd.getMinWriteId();
+ }
+ }
+ Assert.assertEquals(minTxn, min);
+ Assert.assertEquals(maxTxn, max);
+ boolean isVectorizationEnabled = conf.getBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED);
+ if(vectorize) {
+ conf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true);
+ }
+
+ String currStrategy = conf.getVar(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY);
+ for(String strategy : ((Validator.StringSet)HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.getValidator()).getExpected()) {
+ //run it with each split strategy - make sure there are differences
+ conf.setVar(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY, strategy.toUpperCase());
+ List<String> actualResult = queryTable(driver, validationQuery);
+ for (int i = 0; i < actualResult.size(); i++) {
+ Assert.assertEquals("diff at [" + i + "]. actual=" + actualResult + " expected=" +
+ Arrays.toString(records), records[i], actualResult.get(i));
+ }
+ }
+ conf.setVar(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY, currStrategy);
+ conf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, isVectorizationEnabled);
+ }
+
+ private void checkNothingWritten(Path partitionPath) throws Exception {
+ ValidWriteIdList writeIds = msClient.getValidWriteIds(AcidUtils.getFullTableName(dbName, tblName));
+ AcidUtils.Directory dir = AcidUtils.getAcidState(partitionPath, conf, writeIds);
+ Assert.assertEquals(0, dir.getObsolete().size());
+ Assert.assertEquals(0, dir.getOriginalFiles().size());
+ List<AcidUtils.ParsedDelta> current = dir.getCurrentDirectories();
+ Assert.assertEquals(0, current.size());
+ }
+
+ @Test
+ public void testEndpointConnection() throws Exception {
+ // For partitioned table, partitionVals are specified
+ HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, partitionVals);
+ StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); //shouldn't throw
+ connection.close();
+
+ // For unpartitioned table, partitionVals are not specified
+ endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null);
+ endPt.newConnection(false, "UT_" + Thread.currentThread().getName()).close(); // should not throw
+
+ // For partitioned table, partitionVals are not specified
+ try {
+ endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, null);
+ connection = endPt.newConnection(true, "UT_" + Thread.currentThread().getName());
+ Assert.assertTrue("ConnectionError was not thrown", false);
+ connection.close();
+ } catch (ConnectionError e) {
+ // expecting this exception
+ String errMsg = "doesn't specify any partitions for partitioned table";
+ Assert.assertTrue(e.toString().endsWith(errMsg));
+ }
+
+ // For unpartitioned table, partition values are specified
+ try {
+ endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, partitionVals);
+ connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName());
+ Assert.assertTrue("ConnectionError was not thrown", false);
+ connection.close();
+ } catch (ConnectionError e) {
+ // expecting this exception
+ String errMsg = "specifies partitions for unpartitioned table";
+ Assert.assertTrue(e.toString().endsWith(errMsg));
+ }
+ }
+
+ @Test
+ public void testAddPartition() throws Exception {
+ List<String> newPartVals = new ArrayList<String>(2);
+ newPartVals.add(PART1_CONTINENT);
+ newPartVals.add("Nepal");
+
+ HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName
+ , newPartVals);
+
+ // Ensure partition is absent
+ try {
+ msClient.getPartition(endPt.database, endPt.table, endPt.partitionVals);
+ Assert.assertTrue("Partition already exists", false);
+ } catch (NoSuchObjectException e) {
+ // expect this exception
+ }
+
+ // Create partition
+ Assert.assertNotNull(endPt.newConnection(true, "UT_" + Thread.currentThread().getName()));
+
+ // Ensure partition is present
+ Partition p = msClient.getPartition(endPt.database, endPt.table, endPt.partitionVals);
+ Assert.assertNotNull("Did not find added partition", p);
+ }
+
+ @Test
+ public void testTransactionBatchEmptyCommit() throws Exception {
+ // 1) to partitioned table
+ HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
+ partitionVals);
+ StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName());
+ DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt, connection);
+
+ TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer);
+
+ txnBatch.beginNextTransaction();
+ txnBatch.commit();
+ Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
+ , txnBatch.getCurrentTransactionState());
+ txnBatch.close();
+ connection.close();
+
+ // 2) To unpartitioned table
+ endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null);
+ writer = new DelimitedInputWriter(fieldNames2,",", endPt);
+ connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName());
+
+ txnBatch = connection.fetchTransactionBatch(10, writer);
+ txnBatch.beginNextTransaction();
+ txnBatch.commit();
+ Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
+ , txnBatch.getCurrentTransactionState());
+ txnBatch.close();
+ connection.close();
+ }
+
+ /**
+ * check that transactions that have not heartbeated and timedout get properly aborted
+ * @throws Exception
+ */
+ @Test
+ public void testTimeOutReaper() throws Exception {
+ HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null);
+ DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames2,",", endPt);
+ StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName());
+
+ TransactionBatch txnBatch = connection.fetchTransactionBatch(5, writer);
+ txnBatch.beginNextTransaction();
+ conf.setTimeVar(HiveConf.ConfVars.HIVE_TIMEDOUT_TXN_REAPER_START, 0, TimeUnit.SECONDS);
+ //ensure txn timesout
+ conf.setTimeVar(HiveConf.ConfVars.HIVE_TXN_TIMEOUT, 1, TimeUnit.MILLISECONDS);
+ AcidHouseKeeperService houseKeeperService = new AcidHouseKeeperService();
+ houseKeeperService.setConf(conf);
+ houseKeeperService.run();
+ try {
+ //should fail because the TransactionBatch timed out
+ txnBatch.commit();
+ }
+ catch(TransactionError e) {
+ Assert.assertTrue("Expected aborted transaction", e.getCause() instanceof TxnAbortedException);
+ }
+ txnBatch.close();
+ txnBatch = connection.fetchTransactionBatch(10, writer);
+ txnBatch.beginNextTransaction();
+ txnBatch.commit();
+ txnBatch.beginNextTransaction();
+ houseKeeperService.run();
+ try {
+ //should fail because the TransactionBatch timed out
+ txnBatch.commit();
+ }
+ catch(TransactionError e) {
+ Assert.assertTrue("Expected aborted transaction", e.getCause() instanceof TxnAbortedException);
+ }
+ txnBatch.close();
+ connection.close();
+ }
+
+ @Test
+ public void testHeartbeat() throws Exception {
+ HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null);
+ StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName());
+ DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames2,",", endPt, connection);
+
+ TransactionBatch txnBatch = connection.fetchTransactionBatch(5, writer);
+ txnBatch.beginNextTransaction();
+ //todo: this should ideally check Transaction heartbeat as well, but heartbeat
+ //timestamp is not reported yet
+ //GetOpenTxnsInfoResponse txnresp = msClient.showTxns();
+ ShowLocksRequest request = new ShowLocksRequest();
+ request.setDbname(dbName2);
+ request.setTablename(tblName2);
+ ShowLocksResponse response = msClient.showLocks(request);
+ Assert.assertEquals("Wrong nubmer of locks: " + response, 1, response.getLocks().size());
+ ShowLocksResponseElement lock = response.getLocks().get(0);
+ long acquiredAt = lock.getAcquiredat();
+ long heartbeatAt = lock.getLastheartbeat();
+ txnBatch.heartbeat();
+ response = msClient.showLocks(request);
+ Assert.assertEquals("Wrong number of locks2: " + response, 1, response.getLocks().size());
+ lock = response.getLocks().get(0);
+ Assert.assertEquals("Acquired timestamp didn't match", acquiredAt, lock.getAcquiredat());
+ Assert.assertTrue("Expected new heartbeat (" + lock.getLastheartbeat() +
+ ") == old heartbeat(" + heartbeatAt +")", lock.getLastheartbeat() == heartbeatAt);
+ txnBatch.close();
+ int txnBatchSize = 200;
+ txnBatch = connection.fetchTransactionBatch(txnBatchSize, writer);
+ for(int i = 0; i < txnBatchSize; i++) {
+ txnBatch.beginNextTransaction();
+ if(i % 47 == 0) {
+ txnBatch.heartbeat();
+ }
+ if(i % 10 == 0) {
+ txnBatch.abort();
+ }
+ else {
+ txnBatch.commit();
+ }
+ if(i % 37 == 0) {
+ txnBatch.heartbeat();
+ }
+ }
+
+ }
+ @Test
+ public void testTransactionBatchEmptyAbort() throws Exception {
+ // 1) to partitioned table
+ HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
+ partitionVals);
+ StreamingConnection connection = endPt.newConnection(true, "UT_" + Thread.currentThread().getName());
+ DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt, connection);
+
+ TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer);
+ txnBatch.beginNextTransaction();
+ txnBatch.abort();
+ Assert.assertEquals(TransactionBatch.TxnState.ABORTED
+ , txnBatch.getCurrentTransactionState());
+ txnBatch.close();
+ connection.close();
+
+ // 2) to unpartitioned table
+ endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null);
+ writer = new DelimitedInputWriter(fieldNames,",", endPt);
+ connection = endPt.newConnection(true, "UT_" + Thread.currentThread().getName());
+
+ txnBatch = connection.fetchTransactionBatch(10, writer);
+ txnBatch.beginNextTransaction();
+ txnBatch.abort();
+ Assert.assertEquals(TransactionBatch.TxnState.ABORTED
+ , txnBatch.getCurrentTransactionState());
+ txnBatch.close();
+ connection.close();
+ }
+
+ @Test
+ public void testTransactionBatchCommit_Delimited() throws Exception {
+ testTransactionBatchCommit_Delimited(null);
+ }
+ @Test
+ public void testTransactionBatchCommit_DelimitedUGI() throws Exception {
+ testTransactionBatchCommit_Delimited(Utils.getUGI());
+ }
+ private void testTransactionBatchCommit_Delimited(UserGroupInformation ugi) throws Exception {
+ HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
+ partitionVals);
+ StreamingConnection connection = endPt.newConnection(true, conf, ugi, "UT_" + Thread.currentThread().getName());
+ DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt, conf, connection);
+
+ // 1st Txn
+ TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer);
+ txnBatch.beginNextTransaction();
+ Assert.assertEquals(TransactionBatch.TxnState.OPEN
+ , txnBatch.getCurrentTransactionState());
+ txnBatch.write("1,Hello streaming".getBytes());
+ txnBatch.commit();
+
+ checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
+
+ Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
+ , txnBatch.getCurrentTransactionState());
+
+ // 2nd Txn
+ txnBatch.beginNextTransaction();
+ Assert.assertEquals(TransactionBatch.TxnState.OPEN
+ , txnBatch.getCurrentTransactionState());
+ txnBatch.write("2,Welcome to streaming".getBytes());
+
+ // data should not be visible
+ checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
+
+ txnBatch.commit();
+
+ checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}",
+ "{2, Welcome to streaming}");
+
+ txnBatch.close();
+ Assert.assertEquals(TransactionBatch.TxnState.INACTIVE
+ , txnBatch.getCurrentTransactionState());
+
+
+ connection.close();
+
+
+ // To Unpartitioned table
+ endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null);
+ connection = endPt.newConnection(true, conf, ugi, "UT_" + Thread.currentThread().getName());
+ writer = new DelimitedInputWriter(fieldNames,",", endPt, conf, connection);
+
+ // 1st Txn
+ txnBatch = connection.fetchTransactionBatch(10, writer);
+ txnBatch.beginNextTransaction();
+ Assert.assertEquals(TransactionBatch.TxnState.OPEN
+ , txnBatch.getCurrentTransactionState());
+ txnBatch.write("1,Hello streaming".getBytes());
+ txnBatch.commit();
+
+ Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
+ , txnBatch.getCurrentTransactionState());
+ connection.close();
+ }
+
+ @Test
+ public void testTransactionBatchCommit_Regex() throws Exception {
+ testTransactionBatchCommit_Regex(null);
+ }
+ @Test
+ public void testTransactionBatchCommit_RegexUGI() throws Exception {
+ testTransactionBatchCommit_Regex(Utils.getUGI());
+ }
+ private void testTransactionBatchCommit_Regex(UserGroupInformation ugi) throws Exception {
+ HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
+ partitionVals);
+ StreamingConnection connection = endPt.newConnection(true, conf, ugi, "UT_" + Thread.currentThread().getName());
+ String regex = "([^,]*),(.*)";
+ StrictRegexWriter writer = new StrictRegexWriter(regex, endPt, conf, connection);
+
+ // 1st Txn
+ TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer);
+ txnBatch.beginNextTransaction();
+ Assert.assertEquals(TransactionBatch.TxnState.OPEN
+ , txnBatch.getCurrentTransactionState());
+ txnBatch.write("1,Hello streaming".getBytes());
+ txnBatch.commit();
+
+ checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
+
+ Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
+ , txnBatch.getCurrentTransactionState());
+
+ // 2nd Txn
+ txnBatch.beginNextTransaction();
+ Assert.assertEquals(TransactionBatch.TxnState.OPEN
+ , txnBatch.getCurrentTransactionState());
+ txnBatch.write("2,Welcome to streaming".getBytes());
+
+ // data should not be visible
+ checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
+
+ txnBatch.commit();
+
+ checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}",
+ "{2, Welcome to streaming}");
+
+ txnBatch.close();
+ Assert.assertEquals(TransactionBatch.TxnState.INACTIVE
+ , txnBatch.getCurrentTransactionState());
+
+
+ connection.close();
+
+
+ // To Unpartitioned table
+ endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null);
+ connection = endPt.newConnection(true, conf, ugi, "UT_" + Thread.currentThread().getName());
+ regex = "([^:]*):(.*)";
+ writer = new StrictRegexWriter(regex, endPt, conf, connection);
+
+ // 1st Txn
+ txnBatch = connection.fetchTransactionBatch(10, writer);
+ txnBatch.beginNextTransaction();
+ Assert.assertEquals(TransactionBatch.TxnState.OPEN
+ , txnBatch.getCurrentTransactionState());
+ txnBatch.write("1:Hello streaming".getBytes());
+ txnBatch.commit();
+
+ Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
+ , txnBatch.getCurrentTransactionState());
+ connection.close();
+ }
+
+ @Test
+ public void testTransactionBatchCommit_Json() throws Exception {
+ HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
+ partitionVals);
+ StreamingConnection connection = endPt.newConnection(true, "UT_" + Thread.currentThread().getName());
+ StrictJsonWriter writer = new StrictJsonWriter(endPt, connection);
+
+ // 1st Txn
+ TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer);
+ txnBatch.beginNextTransaction();
+ Assert.assertEquals(TransactionBatch.TxnState.OPEN
+ , txnBatch.getCurrentTransactionState());
+ String rec1 = "{\"id\" : 1, \"msg\": \"Hello streaming\"}";
+ txnBatch.write(rec1.getBytes());
+ txnBatch.commit();
+
+ checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
+
+ Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
+ , txnBatch.getCurrentTransactionState());
+
+ txnBatch.close();
+ Assert.assertEquals(TransactionBatch.TxnState.INACTIVE
+ , txnBatch.getCurrentTransactionState());
+
+ connection.close();
+ List<String> rs = queryTable(driver, "select * from " + dbName + "." + tblName);
+ Assert.assertEquals(1, rs.size());
+ }
+
+ @Test
+ public void testRemainingTransactions() throws Exception {
+ HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
+ partitionVals);
+ DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt);
+ StreamingConnection connection = endPt.newConnection(true, "UT_" + Thread.currentThread().getName());
+
+ // 1) test with txn.Commit()
+ TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer);
+ int batch=0;
+ int initialCount = txnBatch.remainingTransactions();
+ while (txnBatch.remainingTransactions()>0) {
+ txnBatch.beginNextTransaction();
+ Assert.assertEquals(--initialCount, txnBatch.remainingTransactions());
+ for (int rec=0; rec<2; ++rec) {
+ Assert.assertEquals(TransactionBatch.TxnState.OPEN
+ , txnBatch.getCurrentTransactionState());
+ txnBatch.write((batch * rec + ",Hello streaming").getBytes());
+ }
+ txnBatch.commit();
+ Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
+ , txnBatch.getCurrentTransactionState());
+ ++batch;
+ }
+ Assert.assertEquals(0, txnBatch.remainingTransactions());
+ txnBatch.close();
+
+ Assert.assertEquals(TransactionBatch.TxnState.INACTIVE
+ , txnBatch.getCurrentTransactionState());
+
+ // 2) test with txn.Abort()
+ txnBatch = connection.fetchTransactionBatch(10, writer);
+ batch=0;
+ initialCount = txnBatch.remainingTransactions();
+ while (txnBatch.remainingTransactions()>0) {
+ txnBatch.beginNextTransaction();
+ Assert.assertEquals(--initialCount,txnBatch.remainingTransactions());
+ for (int rec=0; rec<2; ++rec) {
+ Assert.assertEquals(TransactionBatch.TxnState.OPEN
+ , txnBatch.getCurrentTransactionState());
+ txnBatch.write((batch * rec + ",Hello streaming").getBytes());
+ }
+ txnBatch.abort();
+ Assert.assertEquals(TransactionBatch.TxnState.ABORTED
+ , txnBatch.getCurrentTransactionState());
+ ++batch;
+ }
+ Assert.assertEquals(0, txnBatch.remainingTransactions());
+ txnBatch.close();
+
+ Assert.assertEquals(TransactionBatch.TxnState.INACTIVE
+ , txnBatch.getCurrentTransactionState());
+
+ connection.close();
+ }
+
+ @Test
+ public void testTransactionBatchAbort() throws Exception {
+
+ HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
+ partitionVals);
+ StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName());
+ DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt, connection);
+
+
+ TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer);
+ txnBatch.beginNextTransaction();
+ txnBatch.write("1,Hello streaming".getBytes());
+ txnBatch.write("2,Welcome to streaming".getBytes());
+ txnBatch.abort();
+
+ checkNothingWritten(partLoc);
+
+ Assert.assertEquals(TransactionBatch.TxnState.ABORTED
+ , txnBatch.getCurrentTransactionState());
+
+ txnBatch.close();
+ connection.close();
+
+ checkNothingWritten(partLoc);
+
+ }
+
+
+ @Test
+ public void testTransactionBatchAbortAndCommit() throws Exception {
+ String agentInfo = "UT_" + Thread.currentThread().getName();
+ HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
+ partitionVals);
+ StreamingConnection connection = endPt.newConnection(false, agentInfo);
+ DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt, connection);
+
+ TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer);
+ txnBatch.beginNextTransaction();
+ txnBatch.write("1,Hello streaming".getBytes());
+ txnBatch.write("2,Welcome to streaming".getBytes());
+ ShowLocksResponse resp = msClient.showLocks(new ShowLocksRequest());
+ Assert.assertEquals("LockCount", 1, resp.getLocksSize());
+ Assert.assertEquals("LockType", LockType.SHARED_READ, resp.getLocks().get(0).getType());
+ Assert.assertEquals("LockState", LockState.ACQUIRED, resp.getLocks().get(0).getState());
+ Assert.assertEquals("AgentInfo", agentInfo, resp.getLocks().get(0).getAgentInfo());
+ txnBatch.abort();
+
+ checkNothingWritten(partLoc);
+
+ Assert.assertEquals(TransactionBatch.TxnState.ABORTED
+ , txnBatch.getCurrentTransactionState());
+
+ txnBatch.beginNextTransaction();
+ txnBatch.write("1,Hello streaming".getBytes());
+ txnBatch.write("2,Welcome to streaming".getBytes());
+ txnBatch.commit();
+
+ checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}",
+ "{2, Welcome to streaming}");
+
+ txnBatch.close();
+ connection.close();
+ }
+
+ @Test
+ public void testMultipleTransactionBatchCommits() throws Exception {
+ HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
+ partitionVals);
+ DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt);
+ StreamingConnection connection = endPt.newConnection(true, "UT_" + Thread.currentThread().getName());
+
+ TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer);
+ txnBatch.beginNextTransaction();
+ txnBatch.write("1,Hello streaming".getBytes());
+ txnBatch.commit();
+ String validationQuery = "select id, msg from " + dbName + "." + tblName + " order by id, msg";
+ checkDataWritten2(partLoc, 1, 10, 1, validationQuery, false, "1\tHello streaming");
+
+ txnBatch.beginNextTransaction();
+ txnBatch.write("2,Welcome to streaming".getBytes());
+ txnBatch.commit();
+
+ checkDataWritten2(partLoc, 1, 10, 1, validationQuery, true, "1\tHello streaming",
+ "2\tWelcome to streaming");
+
+ txnBatch.close();
+
+ // 2nd Txn Batch
+ txnBatch = connection.fetchTransactionBatch(10, writer);
+ txnBatch.beginNextTransaction();
+ txnBatch.write("3,Hello streaming - once again".getBytes());
+ txnBatch.commit();
+
+ checkDataWritten2(partLoc, 1, 20, 2, validationQuery, false, "1\tHello streaming",
+ "2\tWelcome to streaming", "3\tHello streaming - once again");
+
+ txnBatch.beginNextTransaction();
+ txnBatch.write("4,Welcome to streaming - once again".getBytes());
+ txnBatch.commit();
+
+ checkDataWritten2(partLoc, 1, 20, 2, validationQuery, true, "1\tHello streaming",
+ "2\tWelcome to streaming", "3\tHello streaming - once again",
+ "4\tWelcome to streaming - once again");
+
+ Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
+ , txnBatch.getCurrentTransactionState());
+
+ txnBatch.close();
+
+ connection.close();
+ }
+
+ @Test
+ public void testInterleavedTransactionBatchCommits() throws Exception {
+ HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
+ partitionVals);
+ DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames, ",", endPt);
+ StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName());
+
+ // Acquire 1st Txn Batch
+ TransactionBatch txnBatch1 = connection.fetchTransactionBatch(10, writer);
+ txnBatch1.beginNextTransaction();
+
+ // Acquire 2nd Txn Batch
+ DelimitedInputWriter writer2 = new DelimitedInputWriter(fieldNames, ",", endPt);
+ TransactionBatch txnBatch2 = connection.fetchTransactionBatch(10, writer2);
+ txnBatch2.beginNextTransaction();
+
+ // Interleaved writes to both batches
+ txnBatch1.write("1,Hello streaming".getBytes());
+ txnBatch2.write("3,Hello streaming - once again".getBytes());
+
+ checkNothingWritten(partLoc);
+
+ txnBatch2.commit();
+
+ String validationQuery = "select id, msg from " + dbName + "." + tblName + " order by id, msg";
+ checkDataWritten2(partLoc, 11, 20, 1,
+ validationQuery, true, "3\tHello streaming - once again");
+
+ txnBatch1.commit();
+ /*now both batches have committed (but not closed) so we for each primary file we expect a side
+ file to exist and indicate the true length of primary file*/
+ FileSystem fs = partLoc.getFileSystem(conf);
+ AcidUtils.Directory dir = AcidUtils.getAcidState(partLoc, conf,
+ msClient.getValidWriteIds(AcidUtils.getFullTableName(dbName, tblName)));
+ for(AcidUtils.ParsedDelta pd : dir.getCurrentDirectories()) {
+ for(FileStatus stat : fs.listStatus(pd.getPath(), AcidUtils.bucketFileFilter)) {
+ Path lengthFile = OrcAcidUtils.getSideFile(stat.getPath());
+ Assert.assertTrue(lengthFile + " missing", fs.exists(lengthFile));
+ long lengthFileSize = fs.getFileStatus(lengthFile).getLen();
+ Assert.assertTrue("Expected " + lengthFile + " to be non empty. lengh=" +
+ lengthFileSize, lengthFileSize > 0);
+ long logicalLength = AcidUtils.getLogicalLength(fs, stat);
+ long actualLength = stat.getLen();
+ Assert.assertTrue("", logicalLength == actualLength);
+ }
+ }
+ checkDataWritten2(partLoc, 1, 20, 2,
+ validationQuery, false,"1\tHello streaming", "3\tHello streaming - once again");
+
+ txnBatch1.beginNextTransaction();
+ txnBatch1.write("2,Welcome to streaming".getBytes());
+
+ txnBatch2.beginNextTransaction();
+ txnBatch2.write("4,Welcome to streaming - once again".getBytes());
+ //here each batch has written data and committed (to bucket0 since table only has 1 bucket)
+ //so each of 2 deltas has 1 bucket0 and 1 bucket0_flush_length. Furthermore, each bucket0
+ //has now received more data(logically - it's buffered) but it is not yet committed.
+ //lets check that side files exist, etc
+ dir = AcidUtils.getAcidState(partLoc, conf, msClient.getValidWriteIds(AcidUtils.getFullTableName(dbName, tblName)));
+ for(AcidUtils.ParsedDelta pd : dir.getCurrentDirectories()) {
+ for(FileStatus stat : fs.listStatus(pd.getPath(), AcidUtils.bucketFileFilter)) {
+ Path lengthFile = OrcAcidUtils.getSideFile(stat.getPath());
+ Assert.assertTrue(lengthFile + " missing", fs.exists(lengthFile));
+ long lengthFileSize = fs.getFileStatus(lengthFile).getLen();
+ Assert.assertTrue("Expected " + lengthFile + " to be non empty. lengh=" +
+ lengthFileSize, lengthFileSize > 0);
+ long logicalLength = AcidUtils.getLogicalLength(fs, stat);
+ long actualLength = stat.getLen();
+ Assert.assertTrue("", logicalLength <= actualLength);
+ }
+ }
+ checkDataWritten2(partLoc, 1, 20, 2,
+ validationQuery, true,"1\tHello streaming", "3\tHello streaming - once again");
+
+ txnBatch1.commit();
+
+ checkDataWritten2(partLoc, 1, 20, 2,
+ validationQuery, false, "1\tHello streaming",
+ "2\tWelcome to streaming",
+ "3\tHello streaming - once again");
+
+ txnBatch2.commit();
+
+ checkDataWritten2(partLoc, 1, 20, 2,
+ validationQuery, true, "1\tHello streaming",
+ "2\tWelcome to streaming",
+ "3\tHello streaming - once again",
+ "4\tWelcome to streaming - once again");
+
+ Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
+ , txnBatch1.getCurrentTransactionState());
+ Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
+ , txnBatch2.getCurrentTransactionState());
+
+ txnBatch1.close();
+ txnBatch2.close();
+
+ connection.close();
+ }
+
+ private static class WriterThd extends Thread {
+
+ private final StreamingConnection conn;
+ private final DelimitedInputWriter writer;
+ private final String data;
+ private Throwable error;
+
+ WriterThd(HiveEndPoint ep, String data) throws Exception {
+ super("Writer_" + data);
+ writer = new DelimitedInputWriter(fieldNames, ",", ep);
+ conn = ep.newConnection(false, "UT_" + Thread.currentThread().getName());
+ this.data = data;
+ setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
+ @Override
+ public void uncaughtException(Thread thread, Throwable throwable) {
+ error = throwable;
+ LOG.error("Thread " + thread.getName() + " died: " + throwable.getMessage(), throwable);
+ }
+ });
+ }
+
+ @Override
+ public void run() {
+ TransactionBatch txnBatch = null;
+ try {
+ txnBatch = conn.fetchTransactionBatch(10, writer);
+ while (txnBatch.remainingTransactions() > 0) {
+ txnBatch.beginNextTransaction();
+ txnBatch.write(data.getBytes());
+ txnBatch.write(data.getBytes());
+ txnBatch.commit();
+ } // while
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ } finally {
+ if (txnBatch != null) {
+ try {
+ txnBatch.close();
+ } catch (Exception e) {
+ LOG.error("txnBatch.close() failed: " + e.getMessage(), e);
+ conn.close();
+ }
+ }
+ try {
+ conn.close();
+ } catch (Exception e) {
+ LOG.error("conn.close() failed: " + e.getMessage(), e);
+ }
+
+ }
+ }
+ }
+
+ @Test
+ public void testConcurrentTransactionBatchCommits() throws Exception {
+ final HiveEndPoint ep = new HiveEndPoint(metaStoreURI, dbName, tblName, partitionVals);
+ List<WriterThd> writers = new ArrayList<WriterThd>(3);
+ writers.add(new WriterThd(ep, "1,Matrix"));
+ writers.add(new WriterThd(ep, "2,Gandhi"));
+ writers.add(new WriterThd(ep, "3,Silence"));
+
+ for(WriterThd w : writers) {
+ w.start();
+ }
+ for(WriterThd w : writers) {
+ w.join();
+ }
+ for(WriterThd w : writers) {
+ if(w.error != null) {
+ Assert.assertFalse("Writer thread" + w.getName() + " died: " + w.error.getMessage() +
+ " See log file for stack trace", true);
+ }
+ }
+ }
+
+
+ private ArrayList<SampleRec> dumpBucket(Path orcFile) throws IOException {
+ org.apache.hadoop.fs.FileSystem fs = org.apache.hadoop.fs.FileSystem.getLocal(new Configuration());
+ Reader reader = OrcFile.createReader(orcFile,
+ OrcFile.readerOptions(conf).filesystem(fs));
+
+ RecordReader rows = reader.rows();
+ StructObjectInspector inspector = (StructObjectInspector) reader
+ .getObjectInspector();
+
+ System.out.format("Found Bucket File : %s \n", orcFile.getName());
+ ArrayList<SampleRec> result = new ArrayList<SampleRec>();
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ SampleRec rec = (SampleRec) deserializeDeltaFileRow(row, inspector)[5];
+ result.add(rec);
+ }
+
+ return result;
+ }
+
+ // Assumes stored data schema = [acid fields],string,int,string
+ // return array of 6 fields, where the last field has the actual data
+ private static Object[] deserializeDeltaFileRow(Object row, StructObjectInspector inspector) {
+ List<? extends StructField> fields = inspector.getAllStructFieldRefs();
+
+ WritableIntObjectInspector f0ins = (WritableIntObjectInspector) fields.get(0).getFieldObjectInspector();
+ WritableLongObjectInspector f1ins = (WritableLongObjectInspector) fields.get(1).getFieldObjectInspector();
+ WritableIntObjectInspector f2ins = (WritableIntObjectInspector) fields.get(2).getFieldObjectInspector();
+ WritableLongObjectInspector f3ins = (WritableLongObjectInspector) fields.get(3).getFieldObjectInspector();
+ WritableLongObjectInspector f4ins = (WritableLongObjectInspector) fields.get(4).getFieldObjectInspector();
+ StructObjectInspector f5ins = (StructObjectInspector) fields.get(5).getFieldObjectInspector();
+
+ int f0 = f0ins.get(inspector.getStructFieldData(row, fields.get(0)));
+ long f1 = f1ins.get(inspector.getStructFieldData(row, fields.get(1)));
+ int f2 = f2ins.get(inspector.getStructFieldData(row, fields.get(2)));
+ long f3 = f3ins.get(inspector.getStructFieldData(row, fields.get(3)));
+ long f4 = f4ins.get(inspector.getStructFieldData(row, fields.get(4)));
+ SampleRec f5 = deserializeInner(inspector.getStructFieldData(row, fields.get(5)), f5ins);
+
+ return new Object[] {f0, f1, f2, f3, f4, f5};
+ }
+
+ // Assumes row schema => string,int,string
+ private static SampleRec deserializeInner(Object row, StructObjectInspector inspector) {
+ List<? extends StructField> fields = inspector.getAllStructFieldRefs();
+
+ WritableStringObjectInspector f0ins = (WritableStringObjectInspector) fields.get(0).getFieldObjectInspector();
+ WritableIntObjectInspector f1ins = (WritableIntObjectInspector) fields.get(1).getFieldObjectInspector();
+ WritableStringObjectInspector f2ins = (WritableStringObjectInspector) fields.get(2).getFieldObjectInspector();
+
+ String f0 = f0ins.getPrimitiveJavaObject(inspector.getStructFieldData(row, fields.get(0)));
+ int f1 = f1ins.get(inspector.getStructFieldData(row, fields.get(1)));
+ String f2 = f2ins.getPrimitiveJavaObject(inspector.getStructFieldData(row, fields.get(2)));
+ return new SampleRec(f0, f1, f2);
+ }
+
+ @Test
+ public void testBucketing() throws Exception {
+ String agentInfo = "UT_" + Thread.currentThread().getName();
+ dropDB(msClient, dbName3);
+ dropDB(msClient, dbName4);
+
+ // 1) Create two bucketed tables
+ String dbLocation = dbFolder.newFolder(dbName3).getCanonicalPath() + ".db";
+ dbLocation = dbLocation.replaceAll("\\\\","/"); // for windows paths
+ String[] colNames = "key1,key2,data".split(",");
+ String[] colTypes = "string,int,string".split(",");
+ String[] bucketNames = "key1,key2".split(",");
+ int bucketCount = 4;
+ createDbAndTable(driver, dbName3, tblName3, null, colNames, colTypes, bucketNames
+ , null, dbLocation, bucketCount);
+
+ String dbLocation2 = dbFolder.newFolder(dbName4).getCanonicalPath() + ".db";
+ dbLocation2 = dbLocation2.replaceAll("\\\\","/"); // for windows paths
+ String[] colNames2 = "key3,key4,data2".split(",");
+ String[] colTypes2 = "string,int,string".split(",");
+ String[] bucketNames2 = "key3,key4".split(",");
+ createDbAndTable(driver, dbName4, tblName4, null, colNames2, colTypes2, bucketNames2
+ , null, dbLocation2, bucketCount);
+
+
+ // 2) Insert data into both tables
+ HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName3, tblName3, null);
+ StreamingConnection connection = endPt.newConnection(false, agentInfo);
+ DelimitedInputWriter writer = new DelimitedInputWriter(colNames,",", endPt, connection);
+
+ TransactionBatch txnBatch = connection.fetchTransactionBatch(2, writer);
+ txnBatch.beginNextTransaction();
+ txnBatch.write("name0,1,Hello streaming".getBytes());
+ txnBatch.write("name2,2,Welcome to streaming".getBytes());
+ txnBatch.write("name4,2,more Streaming unlimited".getBytes());
+ txnBatch.write("name5,2,even more Streaming unlimited".getBytes());
+ txnBatch.commit();
+
+
+ HiveEndPoint endPt2 = new HiveEndPoint(metaStoreURI, dbName4, tblName4, null);
+ StreamingConnection connection2 = endPt2.newConnection(false, agentInfo);
+ DelimitedInputWriter writer2 = new DelimitedInputWriter(colNames2,",", endPt2, connection);
+ TransactionBatch txnBatch2 = connection2.fetchTransactionBatch(2, writer2);
+ txnBatch2.beginNextTransaction();
+
+ txnBatch2.write("name5,2,fact3".getBytes()); // bucket 0
+ txnBatch2.write("name8,2,fact3".getBytes()); // bucket 1
+ txnBatch2.write("name0,1,fact1".getBytes()); // bucket 2
+
+ txnBatch2.commit();
+
+ // 3 Check data distribution in buckets
+
+ HashMap<Integer, ArrayList<SampleRec>> actual1 = dumpAllBuckets(dbLocation, tblName3);
+ HashMap<Integer, ArrayList<SampleRec>> actual2 = dumpAllBuckets(dbLocation2, tblName4);
+ System.err.println("\n Table 1");
+ System.err.println(actual1);
+ System.err.println("\n Table 2");
+ System.err.println(actual2);
+
+ // assert bucket listing is as expected
+ Assert.assertEquals("number of buckets does not match expectation", actual1.values().size(), 3);
+ Assert.assertEquals("records in bucket does not match expectation", actual1.get(0).size(), 2);
+ Assert.assertEquals("records in bucket does not match expectation", actual1.get(1).size(), 1);
+ Assert.assertTrue("bucket 2 shouldn't have been created", actual1.get(2) == null);
+ Assert.assertEquals("records in bucket does not match expectation", actual1.get(3).size(), 1);
+ }
+ private void runCmdOnDriver(String cmd) throws QueryFailedException {
+ boolean t = runDDL(driver, cmd);
+ Assert.assertTrue(cmd + " failed", t);
+ }
+
+
+ @Test
+ public void testFileDump() throws Exception {
+ String agentInfo = "UT_" + Thread.currentThread().getName();
+ dropDB(msClient, dbName3);
+ dropDB(msClient, dbName4);
+
+ // 1) Create two bucketed tables
+ String dbLocation = dbFolder.newFolder(dbName3).getCanonicalPath() + ".db";
+ dbLocation = dbLocation.replaceAll("\\\\","/"); // for windows paths
+ String[] colNames = "key1,key2,data".split(",");
+ String[] colTypes = "string,int,string".split(",");
+ String[] bucketNames = "key1,key2".split(",");
+ int bucketCount = 4;
+ createDbAndTable(driver, dbName3, tblName3, null, colNames, colTypes, bucketNames
+ , null, dbLocation, bucketCount);
+
+ String dbLocation2 = dbFolder.newFolder(dbName4).getCanonicalPath() + ".db";
+ dbLocation2 = dbLocation2.replaceAll("\\\\","/"); // for windows paths
+ String[] colNames2 = "key3,key4,data2".split(",");
+ String[] colTypes2 = "string,int,string".split(",");
+ String[] bucketNames2 = "key3,key4".split(",");
+ createDbAndTable(driver, dbName4, tblName4, null, colNames2, colTypes2, bucketNames2
+ , null, dbLocation2, bucketCount);
+
+
+ // 2) Insert data into both tables
+ HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName3, tblName3, null);
+ StreamingConnection connection = endPt.newConnection(false, agentInfo);
+ DelimitedInputWriter writer = new DelimitedInputWriter(colNames,",", endPt, connection);
+
+ TransactionBatch txnBatch = connection.fetchTransactionBatch(2, writer);
+ txnBatch.beginNextTransaction();
+ txnBatch.write("name0,1,Hello streaming".getBytes());
+ txnBatch.write("name2,2,Welcome to streaming".getBytes());
+ txnBatch.write("name4,2,more Streaming unlimited".getBytes());
+ txnBatch.write("name5,2,even more Streaming unlimited".getBytes());
+ txnBatch.commit();
+
+ PrintStream origErr = System.err;
+ ByteArrayOutputStream myErr = new ByteArrayOutputStream();
+
+ // replace stderr and run command
+ System.setErr(new PrintStream(myErr));
+ FileDump.main(new String[]{dbLocation});
+ System.err.flush();
+ System.setErr(origErr);
+
+ String errDump = new String(myErr.toByteArray());
+ Assert.assertEquals(false, errDump.contains("file(s) are corrupted"));
+ // since this test runs on local file system which does not have an API to tell if files or
+ // open or not, we are testing for negative case even though the bucket files are still open
+ // for writes (transaction batch not closed yet)
+ Assert.assertEquals(false, errDump.contains("is still open for writes."));
+
+ HiveEndPoint endPt2 = new HiveEndPoint(metaStoreURI, dbName4, tblName4, null);
+ DelimitedInputWriter writer2 = new DelimitedInputWriter(colNames2,",", endPt2);
+ StreamingConnection connection2 = endPt2.newConnection(false, agentInfo);
+ TransactionBatch txnBatch2 = connection2.fetchTransactionBatch(2, writer2);
+ txnBatch2.beginNextTransaction();
+
+ txnBatch2.write("name5,2,fact3".getBytes()); // bucket 0
+ txnBatch2.write("name8,2,fact3".getBytes()); // bucket 1
+ txnBatch2.write("name0,1,fact1".getBytes()); // bucket 2
+ // no data for bucket 3 -- expect 0 length bucket file
+
+ txnBatch2.commit();
+
+ origErr = System.err;
+ myErr = new ByteArrayOutputStream();
+
+ // replace stderr and run command
+ System.setErr(new PrintStream(myErr));
+ FileDump.main(new String[]{dbLocation});
+ System.out.flush();
+ System.err.flush();
+ System.setErr(origErr);
+
+ errDump = new String(myErr.toByteArray());
+ Assert.assertEquals(false, errDump.contains("Exception"));
+ Assert.assertEquals(false, errDump.contains("file(s) are corrupted"));
+ Assert.assertEquals(false, errDump.contains("is still open for writes."));
+ }
+
+ @Test
+ public void testFileDumpCorruptDataFiles() throws Exception {
+ dropDB(msClient, dbName3);
+
+ // 1) Create two bucketed tables
+ String dbLocation = dbFolder.newFolder(dbName3).getCanonicalPath() + ".db";
+ dbLocation = dbLocation.replaceAll("\\\\","/"); // for windows paths
+ String[] colNames = "key1,key2,data".split(",");
+ String[] colTypes = "string,int,string".split(",");
+ String[] bucketNames = "key1,key2".split(",");
+ int bucketCount = 4;
+ createDbAndTable(driver, dbName3, tblName3, null, colNames, colTypes, bucketNames
+ , null, dbLocation, bucketCount);
+
+ // 2) Insert data into both tables
+ HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName3, tblName3, null);
+ StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName());
+ DelimitedInputWriter writer = new DelimitedInputWriter(colNames,",", endPt, connection);
+
+ // we need side file for this test, so we create 2 txn batch and test with only one
+ TransactionBatch txnBatch = connection.fetchTransactionBatch(2, writer);
+ txnBatch.beginNextTransaction();
+ txnBatch.write("name0,1,Hello streaming".getBytes());
+ txnBatch.write("name2,2,Welcome to streaming".getBytes());
+ txnBatch.write("name4,2,more Streaming unlimited".getBytes());
+ txnBatch.write("name5,2,even more Streaming unlimited".getBytes());
+ txnBatch.commit();
+
+ // intentionally corrupt some files
+ Path path = new Path(dbLocation);
+ Collection<String> files = FileDump.getAllFilesInPath(path, conf);
+ int readableFooter = -1;
+ for (String file : files) {
+ if (file.contains("bucket_00000")) {
+ // empty out the file
+ corruptDataFile(file, conf, Integer.MIN_VALUE);
+ } else if (file.contains("bucket_00001")) {
+ corruptDataFile(file, conf, -1);
+ } else if (file.contains("bucket_00002")) {
+ Assert.assertFalse("bucket 2 shouldn't have been created", true);
+ } else if (file.contains("bucket_00003")) {
+ corruptDataFile(file, conf, 100);
+ }
+ }
+
+ PrintStream origErr = System.err;
+ ByteArrayOutputStream myErr = new ByteArrayOutputStream();
+
+ // replace stderr and run command
+ System.setErr(new PrintStream(myErr));
+ FileDump.main(new String[]{dbLocation});
+ System.err.flush();
+ System.setErr(origErr);
+
+ String errDump = new String(myErr.toByteArray());
+ Assert.assertEquals(false, errDump.contains("Exception"));
+ Assert.assertEquals(true, errDump.contains("3 file(s) are corrupted"));
+ Assert.assertEquals(false, errDump.contains("is still open for writes."));
+
+ origErr = System.err;
+ myErr = new ByteArrayOutputStream();
+
+ // replace stderr and run command
+ System.setErr(new PrintStream(myErr));
+ FileDump.main(new String[]{dbLocation, "--recover", "--skip-dump"});
+ System.err.flush();
+ System.setErr(origErr);
+
+ errDump = new String(myErr.toByteArray());
+ Assert.assertEquals(true, errDump.contains("bucket_00000 recovered successfully!"));
+ Assert.assertEquals(true, errDump.contains("No readable footers found. Creating empty orc file."));
+ Assert.assertEquals(true, errDump.contains("bucket_00001 recovered successfully!"));
+ Assert.assertEquals(true, errDump.contains("bucket_00003 recovered successfully!"));
+ Assert.assertEquals(false, errDump.contains("Exception"));
+ Assert.assertEquals(false, errDump.contains("is still open for writes."));
+
+ // test after recovery
+ origErr = System.err;
+ myErr = new ByteArrayOutputStream();
+
+ // replace stdout and run command
+ System.setErr(new PrintStream(myErr));
+ FileDump.main(new String[]{dbLocation});
+ System.err.flush();
+ System.setErr(origErr);
+
+ errDump = new String(myErr.toByteArray());
+ Assert.assertEquals(false, errDump.contains("Exception"));
+ Assert.assertEquals(false, errDump.contains("file(s) are corrupted"));
+ Assert.assertEquals(false, errDump.contains("is still open for writes."));
+
+ // after recovery there shouldn't be any *_flush_length files
+ files = FileDump.getAllFilesInPath(path, conf);
+ for (String file : files) {
+ Assert.assertEquals(false, file.contains("_flush_length"));
+ }
+
+ txnBatch.close();
+ }
+
+ private void corruptDataFile(final String file, final Configuration conf, final int addRemoveBytes)
+ throws Exception {
+ Path bPath = new Path(file);
+ Path cPath = new Path(bPath.getParent(), bPath.getName() + ".corrupt");
+ FileSystem fs = bPath.getFileSystem(conf);
+ FileStatus fileStatus = fs.getFileStatus(bPath);
+ int len = addRemoveBytes == Integer.MIN_VALUE ? 0 : (int) fileStatus.getLen() + addRemoveBytes;
+ byte[] buffer = new byte[len];
+ FSDataInputStream fdis = fs.open(bPath);
+ fdis.readFully(0, buffer, 0, (int) Math.min(fileStatus.getLen(), buffer.length));
+ fdis.close();
+ FSDataOutputStream fdos = fs.create(cPath, true);
+ fdos.write(buffer, 0, buffer.length);
+ fdos.close();
+ fs.delete(bPath, false);
+ fs.rename(cPath, bPath);
+ }
+
+ @Test
+ public void testFileDumpCorruptSideFiles() throws Exception {
+ dropDB(msClient, dbName3);
+
+ // 1) Create two bucketed tables
+ String dbLocation = dbFolder.newFolder(dbName3).getCanonicalPath() + ".db";
+ dbLocation = dbLocation.replaceAll("\\\\","/"); // for windows paths
+ String[] colNames = "key1,key2,data".split(",");
+ String[] colTypes = "string,int,string".split(",");
+ String[] bucketNames = "key1,key2".split(",");
+ int bucketCount = 4;
+ createDbAndTable(driver, dbName3, tblName3, null, colNames, colTypes, bucketNames
+ , null, dbLocation, bucketCount);
+
+ // 2) Insert data into both tables
+ HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName3, tblName3, null);
+ StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName());
+ DelimitedInputWriter writer = new DelimitedInputWriter(colNames,",", endPt, connection);
+
+ TransactionBatch txnBatch = connection.fetchTransactionBatch(2, writer);
+ txnBatch.beginNextTransaction();
+ txnBatch.write("name0,1,Hello streaming".getBytes());
+ txnBatch.write("name2,2,Welcome to streaming".getBytes());
+ txnBatch.write("name4,2,more Streaming unlimited".getBytes());
+ txnBatch.write("name5,2,even more Streaming unlimited".getBytes());
+ txnBatch.write("name6,3,aHello streaming".getBytes());
+ txnBatch.commit();
+
+ Map<String,List<Long>> offsetMap = new HashMap<String,List<Long>>();
+ recordOffsets(conf, dbLocation, offsetMap);
+
+ txnBatch.beginNextTransaction();
+ txnBatch.write("name01,11,-Hello streaming".getBytes());
+ txnBatch.write("name21,21,-Welcome to streaming".getBytes());
+ txnBatch.write("name41,21,-more Streaming unlimited".getBytes());
+ txnBatch.write("name51,21,-even more Streaming unlimited".getBytes());
+ txnBatch.write("name02,12,--Hello streaming".getBytes());
+ txnBatch.write("name22,22,--Welcome to streaming".getBytes());
+ txnBatch.write("name42,22,--more Streaming unlimited".getBytes());
+ txnBatch.write("name52,22,--even more Streaming unlimited".getBytes());
+ txnBatch.write("name7,4,aWelcome to streaming".getBytes());
+ txnBatch.write("name8,5,amore Streaming unlimited".getBytes());
+ txnBatch.write("name9,6,aeven more Streaming unlimited".getBytes());
+ txnBatch.write("name10,7,bHello streaming".getBytes());
+ txnBatch.write("name11,8,bWelcome to streaming".getBytes());
+ txnBatch.write("name12,9,bmore Streaming unlimited".getBytes());
+ txnBatch.write("name13,10,beven more Streaming unlimited".getBytes());
+ txnBatch.commit();
+
+ recordOffsets(conf, dbLocation, offsetMap);
+
+ // intentionally corrupt some files
+ Path path = new Path(dbLocation);
+ Collection<String> files = FileDump.getAllFilesInPath(path, conf);
+ for (String file : files) {
+ if (file.contains("bucket_00000")) {
+ corruptSideFile(file, conf, offsetMap, "bucket_00000", -1); // corrupt last entry
+ } else if (file.contains("bucket_00001")) {
+ corruptSideFile(file, conf, offsetMap, "bucket_00001", 0); // empty out side file
+ } else if (file.contains("bucket_00002")) {
+ corruptSideFile(file, conf, offsetMap, "bucket_00002", 3); // total 3 entries (2 valid + 1 fake)
+ } else if (file.contains("bucket_00003")) {
+ corruptSideFile(file, conf, offsetMap, "bucket_00003", 10); // total 10 entries (2 valid + 8 fake)
+ }
+ }
+
+ PrintStream origErr = System.err;
+ ByteArrayOutputStream myErr = new ByteArrayOutputStream();
+
+ // replace stderr and run command
+ System.setErr(new PrintStream(myErr));
+ FileDump.main(new String[]{dbLocation});
+ System.err.flush();
+ System.setErr(origErr);
+
+ String errDump = new String(myErr.toByteArray());
+ Assert.assertEquals(true, errDump.contains("bucket_00000_flush_length [length: 11"));
+ Assert.assertEquals(true, errDump.contains("bucket_00001_flush_length [length: 0"));
+ Assert.assertEquals(true, errDump.contains("bucket_00002_flush_length [length: 24"));
+ Assert.assertEquals(true, errDump.contains("bucket_00003_flush_length [length: 80"));
+ Assert.assertEquals(false, errDump.contains("Exception"));
+ Assert.assertEquals(true, errDump.contains("4 file(s) are corrupted"));
+ Assert.assertEquals(false, errDump.contains("is still open for writes."));
+
+ origErr = System.err;
+ myErr = new ByteArrayOutputStream();
+
+ // replace stderr and run command
+ System.setErr(new PrintStream(myErr));
+ FileDump.main(new String[]{dbLocation, "--recover", "--skip-dump"});
+ System.err.flush();
+ System.setErr(origErr);
+
+ errDump = new String(myErr.toByteArray());
+ Assert.assertEquals(true, errDump.contains("bucket_00000 recovered successfully!"));
+ Assert.assertEquals(true, errDump.contains("bucket_00001 recovered successfully!"));
+ Assert.assertEquals(true, errDump.contains("bucket_00002 recovered successfully!"));
+ Assert.assertEquals(true, errDump.contains("bucket_00003 recovered successfully!"));
+ List<Long> offsets = offsetMap.get("bucket_00000");
+ Assert.assertEquals(true, errDump.contains("Readable footerOffsets: " + offsets.toString()));
+ offsets = offsetMap.get("bucket_00001");
+ Assert.assertEquals(true, errDump.contains("Readable footerOffsets: " + offsets.toString()));
+ offsets = offsetMap.get("bucket_00002");
+ Assert.assertEquals(true, errDump.contains("Readable footerOffsets: " + offsets.toString()));
+ offsets = offsetMap.get("bucket_00003");
+ Assert.assertEquals(true, errDump.contains("Readable footerOffsets: " + offsets.toString()));
+ Assert.assertEquals(false, errDump.contains("Exception"));
+ Assert.assertEquals(false, errDump.contains("is still open for writes."));
+
+ // test after recovery
+ origErr = System.err;
+ myErr = new ByteArrayOutputStream();
+
+ // replace stdout and run command
+ System.setErr(new PrintStream(myErr));
+ FileDump.main(new String[]{dbLocation});
+ System.err.flush();
+ System.setErr(origErr);
+
+ errDump = new String(myErr.toByteArray());
+ Assert.assertEquals(false, errDump.contains("Exception"));
+ Assert.assertEquals(false, errDump.contains("file(s) are corrupted"));
+ Assert.assertEquals(false, errDump.contains("is still open for writes."));
+
+ // after recovery there shouldn't be any *_flush_length files
+ files = FileDump.getAllFilesInPath(path, conf);
+ for (String file : files) {
+ Assert.assertEquals(false, file.contains("_flush_length"));
+ }
+
+ txnBatch.close();
+ }
+
+ private void corruptSideFile(final String file, final HiveConf conf,
+ final Map<String, List<Long>> offsetMap, final String key, final int numEntries)
+ throws IOException {
+ Path dataPath = new Path(file);
+ Path sideFilePath = OrcAcidUtils.getSideFile(dataPath);
+ Path cPath = new Path(sideFilePath.getParent(), sideFilePath.getName() + ".corrupt");
+ FileSystem fs = sideFilePath.getFileSystem(conf);
+ List<Long> offsets = offsetMap.get(key);
+ long lastOffset = offsets.get(offsets.size() - 1);
+ FSDataOutputStream fdos = fs.create(cPath, true);
+ // corrupt last entry
+ if (numEntries < 0) {
+ byte[] lastOffsetBytes = longToBytes(lastOffset);
+ for (int i = 0; i < offsets.size() - 1; i++) {
+ fdos.writeLong(offsets.get(i));
+ }
+
+ fdos.write(lastOffsetBytes, 0, 3);
+ } else if (numEntries > 0) {
+ int firstRun = Math.min(offsets.size(), numEntries);
+ // add original entries
+ for (int i=0; i < firstRun; i++) {
+ fdos.writeLong(offsets.get(i));
+ }
+
+ // add fake entries
+ int remaining = numEntries - firstRun;
+ for (int i = 0; i < remaining; i++) {
+ fdos.writeLong(lastOffset + ((i + 1) * 100));
+ }
+ }
+
+ fdos.close();
+ fs.delete(sideFilePath, false);
+ fs.rename(cPath, sideFilePath);
+ }
+
+ private byte[] longToBytes(long x) {
+ ByteBuffer buffer = ByteBuffer.allocate(8);
+ buffer.putLong(x);
+ return buffer.array();
+ }
+
+ private void recordOffsets(final HiveConf conf, final String dbLocation,
+ final Map<String, List<Long>> offsetMap) throws IOException {
+ Path path = new Path(dbLocation);
+ Collection<String> files = FileDump.getAllFilesInPath(path, conf);
+ for (String file: files) {
+ Path bPath = new Path(file);
+ FileSystem fs = bPath.getFileSystem(conf);
+ FileStatus fileStatus = fs.getFileStatus(bPath);
+ long len = fileStatus.getLen();
+
+ if (file.contains("bucket_00000")) {
+ if (offsetMap.containsKey("bucket_00000")) {
+ List<Long> offsets = offsetMap.get("bucket_00000");
+ offsets.add(len);
+ offsetMap.put("bucket_00000", offsets);
+ } else {
+ List<Long> offsets = new ArrayList<Long>();
+ offsets.add(len);
+ offsetMap.put("bucket_00000", offsets);
+ }
+ } else if (file.contains("bucket_00001")) {
+ if (offsetMap.containsKey("bucket_00001")) {
+ List<Long> offsets = offsetMap.get("bucket_00001");
+ offsets.add(len);
+ offsetMap.put("bucket_00001", offsets);
+ } else {
+ List<Long> offsets = new ArrayList<Long>();
+ offsets.add(len);
+ offsetMap.put("bucket_00001", offsets);
+ }
+ } else if (file.contains("bucket_00002")) {
+ if (offsetMap.containsKey("bucket_00002")) {
+ List<Long> offsets = offsetMap.get("bucket_00002");
+ offsets.add(len);
+ offsetMap.put("bucket_00002", offsets);
+ } else {
+ List<Long> offsets = new ArrayList<Long>();
+ offsets.add(len);
+ offsetMap.put("bucket_00002", offsets);
+ }
+ } else if (file.contains("bucket_00003")) {
+ if (offsetMap.containsKey("bucket_00003")) {
+ List<Long> offsets = offsetMap.get("bucket_00003");
+ offsets.add(len);
+ offsetMap.put("bucket_00003", offsets);
+ } else {
+ List<Long> offsets = new ArrayList<Long>();
+ offsets.add(len);
+ offsetMap.put("bucket_00003", offsets);
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testErrorHandling() throws Exception {
+ String agentInfo = "UT_" + Thread.currentThread().getName();
+ runCmdOnDriver("create database testErrors");
+ runCmdOnDriver("use testErrors");
+ runCmdOnDriver("create table T(a int, b int) clustered by (b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+
+ HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "testErrors", "T", null);
+ StreamingConnection connection = endPt.newConnection(false, agentInfo);
+ DelimitedInputWriter innerWriter = new DelimitedInputWriter("a,b".split(","),",", endPt, connection);
+ FaultyWriter writer = new FaultyWriter(innerWriter);
+
+ TransactionBatch txnBatch = connection.fetchTransactionBatch(2, writer);
+ txnBatch.close();
+ txnBatch.heartbeat();//this is no-op on closed batch
+ txnBatch.abort();//ditto
+ GetOpenTxnsInfoResponse r = msClient.showTxns();
+ Assert.assertEquals("HWM didn't match", 17, r.getTxn_high_water_mark());
+ List<TxnInfo> ti = r.getOpen_txns();
+ Assert.assertEquals("wrong status ti(0)", TxnState.ABORTED, ti.get(0).getState());
+ Assert.assertEquals("wrong status ti(1)", TxnState.ABORTED, ti.get(1).getState());
+
+ Exception expectedEx = null;
+ try {
+ txnBatch.beginNextTransaction();
+ }
+ catch(IllegalStateException ex) {
+ expectedEx = ex;
+ }
+ Assert.assertTrue("beginNextTransaction() should have failed",
+ expectedEx != null && expectedEx.getMessage().contains("has been closed()"));
+ expectedEx = null;
+ try {
+ txnBatch.write("name0,1,Hello streaming".getBytes());
+ }
+ catch(IllegalStateException ex) {
+ expectedEx = ex;
+ }
+ Assert.assertTrue("write() should have failed",
+ expectedEx != null && expectedEx.getMessage().contains("has been closed()"));
+ expectedEx = null;
+ try {
+ txnBatch.commit();
+ }
+ catch(IllegalStateException ex) {
+ expectedEx = ex;
+ }
+ Assert.assertTrue("commit() should have failed",
+ expectedEx != null && expectedEx.getMessage().contains("has been closed()"));
+
+ txnBatch = connection.fetchTransactionBatch(2, writer);
+ txnBatch.beginNextTransaction();
+ txnBatch.write("name2,2,Welcome to streaming".getBytes());
+ txnBatch.write("name4,2,more Streaming unlimited".getBytes());
+ txnBatch.write("name5,2,even more Streaming unlimited".getBytes());
+ txnBatch.commit();
+
+ //test toString()
+ String s = txnBatch.toString();
+ Assert.assertTrue("Actual: " + s, s.contains("LastUsed " + JavaUtils.txnIdToString(txnBatch.getCurrentTxnId())));
+ Assert.assertTrue("Actual: " + s, s.contains("TxnStatus[CO]"));
+
+ expectedEx = null;
+ txnBatch.beginNextTransaction();
+ writer.enableErrors();
+ try {
+ txnBatch.write("name6,2,Doh!".getBytes());
+ }
+ catch(StreamingIOFailure ex) {
+ expectedEx = ex;
+ txnBatch.getCurrentTransactionState();
+ txnBatch.getCurrentTxnId();//test it doesn't throw ArrayIndexOutOfBounds...
+ }
+ Assert.assertTrue("Wrong exception: " + (expectedEx != null ? expectedEx.getMessage() : "?"),
+ expectedEx != null && expectedEx.getMessage().contains("Simulated fault occurred"));
+ expectedEx = null;
+ try {
+ txnBatch.commit();
+ }
+ catch(IllegalStateException ex) {
+ expectedEx = ex;
+ }
+ Assert.assertTrue("commit() should have failed",
+ expectedEx != null && expectedEx.getMessage().contains("has been closed()"));
+
+ //test toString()
+ s = txnBatch.toString();
+ Assert.assertTrue("Actual: " + s, s.contains("LastUsed " + JavaUtils.txnIdToString(txnBatch.getCurrentTxnId())));
+ Assert.assertTrue("Actual: " + s, s.contains("TxnStatus[CA]"));
+
+ r = msClient.showTxns();
+ Assert.assertEquals("HWM didn't match", 19, r.getTxn_high_water_mark());
+ ti = r.getOpen_txns();
+ Assert.assertEquals("wrong status ti(0)", TxnState.ABORTED, ti.get(0).getState());
+ Assert.assertEquals("wrong status ti(1)", TxnState.ABORTED, ti.get(1).getState());
+ //txnid 3 was committed and thus not open
+ Assert.assertEquals("wrong status ti(2)", TxnState.ABORTED, ti.get(2).getState());
+
+ writer.disableErrors();
+ txnBatch = connection.fetchTransactionBatch(2, writer);
+ txnBatch.beginNextTransaction();
+ txnBatch.write("name2,2,Welcome to streaming".getBytes());
+ writer.enableErrors();
+ expectedEx = null;
+ try {
+
<TRUNCATED>
[4/4] hive git commit: HIVE-19210: Create separate module for
streaming ingest (Prasanth Jayachandran reviewed by Eugene Koifman)
Posted by pr...@apache.org.
HIVE-19210: Create separate module for streaming ingest (Prasanth Jayachandran reviewed by Eugene Koifman)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/6bd32a0d
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/6bd32a0d
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/6bd32a0d
Branch: refs/heads/master
Commit: 6bd32a0d6a5dbcd0553ff88fd6c7b9a653e6e1eb
Parents: 16d94fb
Author: Prasanth Jayachandran <pr...@apache.org>
Authored: Sun Apr 15 23:34:12 2018 -0700
Committer: Prasanth Jayachandran <pr...@apache.org>
Committed: Sun Apr 15 23:34:12 2018 -0700
----------------------------------------------------------------------
.../hive/hcatalog/streaming/package-info.java | 19 +
itests/hive-unit/pom.xml | 4 +-
.../hive/ql/txn/compactor/TestCompactor.java | 10 +-
packaging/pom.xml | 5 +
packaging/src/main/assembly/src.xml | 1 +
pom.xml | 1 +
streaming/pom.xml | 141 ++
.../hive/streaming/AbstractRecordWriter.java | 324 +++
.../apache/hive/streaming/ConnectionError.java | 35 +
.../hive/streaming/DelimitedInputWriter.java | 331 +++
.../apache/hive/streaming/HeartBeatFailure.java | 33 +
.../org/apache/hive/streaming/HiveEndPoint.java | 1117 +++++++++
.../hive/streaming/ImpersonationFailed.java | 25 +
.../apache/hive/streaming/InvalidColumn.java | 26 +
.../apache/hive/streaming/InvalidPartition.java | 28 +
.../org/apache/hive/streaming/InvalidTable.java | 38 +
.../hive/streaming/InvalidTrasactionState.java | 26 +
.../hive/streaming/PartitionCreationFailed.java | 25 +
.../hive/streaming/QueryFailedException.java | 28 +
.../org/apache/hive/streaming/RecordWriter.java | 43 +
.../hive/streaming/SerializationError.java | 26 +
.../hive/streaming/StreamingConnection.java | 57 +
.../hive/streaming/StreamingException.java | 28 +
.../hive/streaming/StreamingIOFailure.java | 31 +
.../apache/hive/streaming/StrictJsonWriter.java | 162 ++
.../hive/streaming/StrictRegexWriter.java | 189 ++
.../apache/hive/streaming/TransactionBatch.java | 125 +
.../streaming/TransactionBatchUnAvailable.java | 25 +
.../apache/hive/streaming/TransactionError.java | 29 +
.../java/org/apache/hive/streaming/package.html | 181 ++
.../streaming/TestDelimitedInputWriter.java | 73 +
.../apache/hive/streaming/TestStreaming.java | 2330 ++++++++++++++++++
32 files changed, 5509 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/6bd32a0d/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/package-info.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/package-info.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/package-info.java
new file mode 100644
index 0000000..36d6b13
--- /dev/null
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/package-info.java
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@Deprecated // use org.apache.hive.streaming instead
+package org.apache.hive.hcatalog.streaming;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/6bd32a0d/itests/hive-unit/pom.xml
----------------------------------------------------------------------
diff --git a/itests/hive-unit/pom.xml b/itests/hive-unit/pom.xml
index 05c362e..3ae7f2f 100644
--- a/itests/hive-unit/pom.xml
+++ b/itests/hive-unit/pom.xml
@@ -76,8 +76,8 @@
<version>${project.version}</version>
</dependency>
<dependency>
- <groupId>org.apache.hive.hcatalog</groupId>
- <artifactId>hive-hcatalog-streaming</artifactId>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-streaming</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/hive/blob/6bd32a0d/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
index 5966740..b19aa23 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
@@ -70,11 +70,11 @@ import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hive.hcatalog.common.HCatUtil;
-import org.apache.hive.hcatalog.streaming.DelimitedInputWriter;
-import org.apache.hive.hcatalog.streaming.HiveEndPoint;
-import org.apache.hive.hcatalog.streaming.StreamingConnection;
-import org.apache.hive.hcatalog.streaming.StreamingException;
-import org.apache.hive.hcatalog.streaming.TransactionBatch;
+import org.apache.hive.streaming.DelimitedInputWriter;
+import org.apache.hive.streaming.HiveEndPoint;
+import org.apache.hive.streaming.StreamingConnection;
+import org.apache.hive.streaming.StreamingException;
+import org.apache.hive.streaming.TransactionBatch;
import org.apache.orc.OrcConf;
import org.junit.After;
import org.junit.Assert;
http://git-wip-us.apache.org/repos/asf/hive/blob/6bd32a0d/packaging/pom.xml
----------------------------------------------------------------------
diff --git a/packaging/pom.xml b/packaging/pom.xml
index e2d61bd..fe1aac8 100644
--- a/packaging/pom.xml
+++ b/packaging/pom.xml
@@ -258,6 +258,11 @@
<version>${project.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-streaming</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.hive.hcatalog</groupId>
<artifactId>hive-hcatalog-streaming</artifactId>
<version>${project.version}</version>
http://git-wip-us.apache.org/repos/asf/hive/blob/6bd32a0d/packaging/src/main/assembly/src.xml
----------------------------------------------------------------------
diff --git a/packaging/src/main/assembly/src.xml b/packaging/src/main/assembly/src.xml
index 486fe52..c477194 100644
--- a/packaging/src/main/assembly/src.xml
+++ b/packaging/src/main/assembly/src.xml
@@ -97,6 +97,7 @@
<include>spark-client/**/*</include>
<include>storage-api/**/*</include>
<include>standalone-metastore/**/*</include>
+ <include>streaming/**/*</include>
<include>testutils/**/*</include>
<include>vector-code-gen/**/*</include>
<include>kryo-registrator/**/*</include>
http://git-wip-us.apache.org/repos/asf/hive/blob/6bd32a0d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 2d30789..6c43181 100644
--- a/pom.xml
+++ b/pom.xml
@@ -50,6 +50,7 @@
<module>serde</module>
<module>service-rpc</module>
<module>service</module>
+ <module>streaming</module>
<module>llap-common</module>
<module>llap-client</module>
<module>llap-ext-client</module>
http://git-wip-us.apache.org/repos/asf/hive/blob/6bd32a0d/streaming/pom.xml
----------------------------------------------------------------------
diff --git a/streaming/pom.xml b/streaming/pom.xml
new file mode 100644
index 0000000..b58ec01
--- /dev/null
+++ b/streaming/pom.xml
@@ -0,0 +1,141 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive</artifactId>
+ <version>3.1.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>hive-streaming</artifactId>
+ <packaging>jar</packaging>
+ <name>Hive Streaming</name>
+
+ <properties>
+ <hive.path.to.root>..</hive.path.to.root>
+ </properties>
+
+ <dependencies>
+ <!-- dependencies are always listed in sorted order by groupId, artifectId -->
+ <!-- intra-project -->
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-serde</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-metastore</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-exec</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-cli</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hive.hcatalog</groupId>
+ <artifactId>hive-hcatalog-core</artifactId>
+ <optional>true</optional>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ <optional>true</optional>
+ <version>3.3.2</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <optional>true</optional>
+ <version>${hadoop.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <optional>true</optional>
+ <version>${hadoop.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commmons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <!-- test -->
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>${junit.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-common</artifactId>
+ <scope>test</scope>
+ <version>${hadoop.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commmons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <sourceDirectory>${basedir}/src/java</sourceDirectory>
+ <testSourceDirectory>${basedir}/src/test</testSourceDirectory>
+ <resources>
+ </resources>
+ <plugins>
+ <!-- plugins are always listed in sorted order by groupId, artifectId -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
http://git-wip-us.apache.org/repos/asf/hive/blob/6bd32a0d/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
new file mode 100644
index 0000000..25998ae
--- /dev/null
+++ b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
+import org.apache.hadoop.hive.ql.io.RecordUpdater;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hive.hcatalog.common.HCatUtil;
+import org.apache.thrift.TException;
+
+import java.io.IOException;
+
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+
+
+public abstract class AbstractRecordWriter implements RecordWriter {
+ static final private Logger LOG = LoggerFactory.getLogger(AbstractRecordWriter.class.getName());
+
+ private final HiveConf conf;
+ private final HiveEndPoint endPoint;
+ final Table tbl;
+
+ private final IMetaStoreClient msClient;
+ final List<Integer> bucketIds;
+ private ArrayList<RecordUpdater> updaters = null;
+
+ private final int totalBuckets;
+ /**
+ * Indicates whether target table is bucketed
+ */
+ private final boolean isBucketed;
+
+ private final Path partitionPath;
+
+ private final AcidOutputFormat<?,?> outf;
+ private Object[] bucketFieldData; // Pre-allocated in constructor. Updated on each write.
+ private Long curBatchMinWriteId;
+ private Long curBatchMaxWriteId;
+
+ private static final class TableWriterPair {
+ private final Table tbl;
+ private final Path partitionPath;
+ TableWriterPair(Table t, Path p) {
+ tbl = t;
+ partitionPath = p;
+ }
+ }
+ /**
+ * @deprecated As of release 1.3/2.1. Replaced by {@link #AbstractRecordWriter(HiveEndPoint, HiveConf, StreamingConnection)}
+ */
+ protected AbstractRecordWriter(HiveEndPoint endPoint, HiveConf conf)
+ throws ConnectionError, StreamingException {
+ this(endPoint, conf, null);
+ }
+ protected AbstractRecordWriter(HiveEndPoint endPoint2, HiveConf conf, StreamingConnection conn)
+ throws StreamingException {
+ this.endPoint = endPoint2;
+ this.conf = conf!=null ? conf
+ : HiveEndPoint.createHiveConf(DelimitedInputWriter.class, endPoint.metaStoreUri);
+ try {
+ msClient = HCatUtil.getHiveMetastoreClient(this.conf);
+ UserGroupInformation ugi = conn != null ? conn.getUserGroupInformation() : null;
+ if (ugi == null) {
+ this.tbl = msClient.getTable(endPoint.database, endPoint.table);
+ this.partitionPath = getPathForEndPoint(msClient, endPoint);
+ } else {
+ TableWriterPair twp = ugi.doAs(
+ new PrivilegedExceptionAction<TableWriterPair>() {
+ @Override
+ public TableWriterPair run() throws Exception {
+ return new TableWriterPair(msClient.getTable(endPoint.database, endPoint.table),
+ getPathForEndPoint(msClient, endPoint));
+ }
+ });
+ this.tbl = twp.tbl;
+ this.partitionPath = twp.partitionPath;
+ }
+ this.isBucketed = tbl.getSd().getNumBuckets() > 0;
+ /**
+ * For unbucketed tables we have exactly 1 RecrodUpdater for each AbstractRecordWriter which
+ * ends up writing to a file bucket_000000
+ * See also {@link #getBucket(Object)}
+ */
+ this.totalBuckets = isBucketed ? tbl.getSd().getNumBuckets() : 1;
+ if(isBucketed) {
+ this.bucketIds = getBucketColIDs(tbl.getSd().getBucketCols(), tbl.getSd().getCols());
+ this.bucketFieldData = new Object[bucketIds.size()];
+ }
+ else {
+ bucketIds = Collections.emptyList();
+ }
+ String outFormatName = this.tbl.getSd().getOutputFormat();
+ outf = (AcidOutputFormat<?, ?>) ReflectionUtils.newInstance(JavaUtils.loadClass(outFormatName), conf);
+ } catch(InterruptedException e) {
+ throw new StreamingException(endPoint2.toString(), e);
+ } catch (MetaException | NoSuchObjectException e) {
+ throw new ConnectionError(endPoint2, e);
+ } catch (TException | ClassNotFoundException | IOException e) {
+ throw new StreamingException(e.getMessage(), e);
+ }
+ }
+
+ /**
+ * used to tag error msgs to provied some breadcrumbs
+ */
+ String getWatermark() {
+ return partitionPath + " writeIds[" + curBatchMinWriteId + "," + curBatchMaxWriteId + "]";
+ }
+ // return the column numbers of the bucketed columns
+ private List<Integer> getBucketColIDs(List<String> bucketCols, List<FieldSchema> cols) {
+ ArrayList<Integer> result = new ArrayList<Integer>(bucketCols.size());
+ HashSet<String> bucketSet = new HashSet<String>(bucketCols);
+ for (int i = 0; i < cols.size(); i++) {
+ if( bucketSet.contains(cols.get(i).getName()) ) {
+ result.add(i);
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Get the SerDe for the Objects created by {@link #encode}. This is public so that test
+ * frameworks can use it.
+ * @return serde
+ * @throws SerializationError
+ */
+ public abstract AbstractSerDe getSerde() throws SerializationError;
+
+ /**
+ * Encode a record as an Object that Hive can read with the ObjectInspector associated with the
+ * serde returned by {@link #getSerde}. This is public so that test frameworks can use it.
+ * @param record record to be deserialized
+ * @return deserialized record as an Object
+ * @throws SerializationError
+ */
+ public abstract Object encode(byte[] record) throws SerializationError;
+
+ protected abstract ObjectInspector[] getBucketObjectInspectors();
+ protected abstract StructObjectInspector getRecordObjectInspector();
+ protected abstract StructField[] getBucketStructFields();
+
+ // returns the bucket number to which the record belongs to
+ protected int getBucket(Object row) throws SerializationError {
+ if(!isBucketed) {
+ return 0;
+ }
+ ObjectInspector[] inspectors = getBucketObjectInspectors();
+ Object[] bucketFields = getBucketFields(row);
+ return ObjectInspectorUtils.getBucketNumber(bucketFields, inspectors, totalBuckets);
+ }
+
+ @Override
+ public void flush() throws StreamingIOFailure {
+ try {
+ for (RecordUpdater updater : updaters) {
+ if (updater != null) {
+ updater.flush();
+ }
+ }
+ } catch (IOException e) {
+ throw new StreamingIOFailure("Unable to flush recordUpdater", e);
+ }
+ }
+
+ @Override
+ public void clear() throws StreamingIOFailure {
+ }
+
+ /**
+ * Creates a new record updater for the new batch
+ * @param minWriteId smallest writeid in the batch
+ * @param maxWriteID largest writeid in the batch
+ * @throws StreamingIOFailure if failed to create record updater
+ */
+ @Override
+ public void newBatch(Long minWriteId, Long maxWriteID)
+ throws StreamingIOFailure, SerializationError {
+ curBatchMinWriteId = minWriteId;
+ curBatchMaxWriteId = maxWriteID;
+ updaters = new ArrayList<RecordUpdater>(totalBuckets);
+ for (int bucket = 0; bucket < totalBuckets; bucket++) {
+ updaters.add(bucket, null);//so that get(i) returns null rather than ArrayOutOfBounds
+ }
+ }
+
+ @Override
+ public void closeBatch() throws StreamingIOFailure {
+ boolean haveError = false;
+ for (RecordUpdater updater : updaters) {
+ if (updater != null) {
+ try {
+ //try not to leave any files open
+ updater.close(false);
+ } catch (Exception ex) {
+ haveError = true;
+ LOG.error("Unable to close " + updater + " due to: " + ex.getMessage(), ex);
+ }
+ }
+ }
+ updaters.clear();
+ if(haveError) {
+ throw new StreamingIOFailure("Encountered errors while closing (see logs) " + getWatermark());
+ }
+ }
+
+ protected static ObjectInspector[] getObjectInspectorsForBucketedCols(List<Integer> bucketIds
+ , StructObjectInspector recordObjInspector)
+ throws SerializationError {
+ ObjectInspector[] result = new ObjectInspector[bucketIds.size()];
+
+ for (int i = 0; i < bucketIds.size(); i++) {
+ int bucketId = bucketIds.get(i);
+ result[i] =
+ recordObjInspector.getAllStructFieldRefs().get( bucketId ).getFieldObjectInspector();
+ }
+ return result;
+ }
+
+
+ private Object[] getBucketFields(Object row) throws SerializationError {
+ StructObjectInspector recordObjInspector = getRecordObjectInspector();
+ StructField[] bucketStructFields = getBucketStructFields();
+ for (int i = 0; i < bucketIds.size(); i++) {
+ bucketFieldData[i] = recordObjInspector.getStructFieldData(row, bucketStructFields[i]);
+ }
+ return bucketFieldData;
+ }
+
+ private RecordUpdater createRecordUpdater(int bucketId, Long minWriteId, Long maxWriteID)
+ throws IOException, SerializationError {
+ try {
+ // Initialize table properties from the table parameters. This is required because the table
+ // may define certain table parameters that may be required while writing. The table parameter
+ // 'transactional_properties' is one such example.
+ Properties tblProperties = new Properties();
+ tblProperties.putAll(tbl.getParameters());
+ return outf.getRecordUpdater(partitionPath,
+ new AcidOutputFormat.Options(conf)
+ .inspector(getSerde().getObjectInspector())
+ .bucket(bucketId)
+ .tableProperties(tblProperties)
+ .minimumWriteId(minWriteId)
+ .maximumWriteId(maxWriteID)
+ .statementId(-1)
+ .finalDestination(partitionPath));
+ } catch (SerDeException e) {
+ throw new SerializationError("Failed to get object inspector from Serde "
+ + getSerde().getClass().getName(), e);
+ }
+ }
+
+ RecordUpdater getRecordUpdater(int bucketId) throws StreamingIOFailure, SerializationError {
+ RecordUpdater recordUpdater = updaters.get(bucketId);
+ if (recordUpdater == null) {
+ try {
+ recordUpdater = createRecordUpdater(bucketId, curBatchMinWriteId, curBatchMaxWriteId);
+ } catch (IOException e) {
+ String errMsg = "Failed creating RecordUpdater for " + getWatermark();
+ LOG.error(errMsg, e);
+ throw new StreamingIOFailure(errMsg, e);
+ }
+ updaters.set(bucketId, recordUpdater);
+ }
+ return recordUpdater;
+ }
+
+ private Path getPathForEndPoint(IMetaStoreClient msClient, HiveEndPoint endPoint)
+ throws StreamingException {
+ try {
+ String location;
+ if(endPoint.partitionVals==null || endPoint.partitionVals.isEmpty() ) {
+ location = msClient.getTable(endPoint.database,endPoint.table)
+ .getSd().getLocation();
+ } else {
+ location = msClient.getPartition(endPoint.database, endPoint.table,
+ endPoint.partitionVals).getSd().getLocation();
+ }
+ return new Path(location);
+ } catch (TException e) {
+ throw new StreamingException(e.getMessage()
+ + ". Unable to get path for end point: "
+ + endPoint.partitionVals, e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/6bd32a0d/streaming/src/java/org/apache/hive/streaming/ConnectionError.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/ConnectionError.java b/streaming/src/java/org/apache/hive/streaming/ConnectionError.java
new file mode 100644
index 0000000..668bffb
--- /dev/null
+++ b/streaming/src/java/org/apache/hive/streaming/ConnectionError.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+public class ConnectionError extends StreamingException {
+
+ public ConnectionError(String msg) {
+ super(msg);
+ }
+
+ public ConnectionError(String msg, Exception innerEx) {
+ super(msg, innerEx);
+ }
+
+ public ConnectionError(HiveEndPoint endPoint, Exception innerEx) {
+ super("Error connecting to " + endPoint +
+ (innerEx == null ? "" : ": " + innerEx.getMessage()), innerEx);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/6bd32a0d/streaming/src/java/org/apache/hive/streaming/DelimitedInputWriter.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/DelimitedInputWriter.java b/streaming/src/java/org/apache/hive/streaming/DelimitedInputWriter.java
new file mode 100644
index 0000000..898b3f9
--- /dev/null
+++ b/streaming/src/java/org/apache/hive/streaming/DelimitedInputWriter.java
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.lazy.LazySerDeParameters;
+import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
+import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.io.BytesWritable;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Streaming Writer handles delimited input (eg. CSV).
+ * Delimited input is parsed & reordered to match column order in table
+ * Uses Lazy Simple Serde to process delimited input
+ */
+public class DelimitedInputWriter extends AbstractRecordWriter {
+ private final boolean reorderingNeeded;
+ private String delimiter;
+ private char serdeSeparator;
+ private int[] fieldToColMapping;
+ private final ArrayList<String> tableColumns;
+ private LazySimpleSerDe serde = null;
+
+ private final LazySimpleStructObjectInspector recordObjInspector;
+ private final ObjectInspector[] bucketObjInspectors;
+ private final StructField[] bucketStructFields;
+
+ static final private Logger LOG = LoggerFactory.getLogger(DelimitedInputWriter.class.getName());
+
+ /** Constructor. Uses default separator of the LazySimpleSerde
+ * @param colNamesForFields Column name assignment for input fields. nulls or empty
+ * strings in the array indicates the fields to be skipped
+ * @param delimiter input field delimiter
+ * @param endPoint Hive endpoint
+ * @throws ConnectionError Problem talking to Hive
+ * @throws ClassNotFoundException Serde class not found
+ * @throws SerializationError Serde initialization/interaction failed
+ * @throws StreamingException Problem acquiring file system path for partition
+ * @throws InvalidColumn any element in colNamesForFields refers to a non existing column
+ */
+ public DelimitedInputWriter(String[] colNamesForFields, String delimiter,
+ HiveEndPoint endPoint, StreamingConnection conn)
+ throws ClassNotFoundException, ConnectionError, SerializationError,
+ InvalidColumn, StreamingException {
+ this(colNamesForFields, delimiter, endPoint, null, conn);
+ }
+ /** Constructor. Uses default separator of the LazySimpleSerde
+ * @param colNamesForFields Column name assignment for input fields. nulls or empty
+ * strings in the array indicates the fields to be skipped
+ * @param delimiter input field delimiter
+ * @param endPoint Hive endpoint
+ * @param conf a Hive conf object. Can be null if not using advanced hive settings.
+ * @throws ConnectionError Problem talking to Hive
+ * @throws ClassNotFoundException Serde class not found
+ * @throws SerializationError Serde initialization/interaction failed
+ * @throws StreamingException Problem acquiring file system path for partition
+ * @throws InvalidColumn any element in colNamesForFields refers to a non existing column
+ */
+ public DelimitedInputWriter(String[] colNamesForFields, String delimiter,
+ HiveEndPoint endPoint, HiveConf conf, StreamingConnection conn)
+ throws ClassNotFoundException, ConnectionError, SerializationError,
+ InvalidColumn, StreamingException {
+ this(colNamesForFields, delimiter, endPoint, conf,
+ (char) LazySerDeParameters.DefaultSeparators[0], conn);
+ }
+ /**
+ * Constructor. Allows overriding separator of the LazySimpleSerde
+ * @param colNamesForFields Column name assignment for input fields
+ * @param delimiter input field delimiter
+ * @param endPoint Hive endpoint
+ * @param conf a Hive conf object. Set to null if not using advanced hive settings.
+ * @param serdeSeparator separator used when encoding data that is fed into the
+ * LazySimpleSerde. Ensure this separator does not occur
+ * in the field data
+ * @param conn connection this Writer is to be used with
+ * @throws ConnectionError Problem talking to Hive
+ * @throws ClassNotFoundException Serde class not found
+ * @throws SerializationError Serde initialization/interaction failed
+ * @throws StreamingException Problem acquiring file system path for partition
+ * @throws InvalidColumn any element in colNamesForFields refers to a non existing column
+ */
+ public DelimitedInputWriter(String[] colNamesForFields, String delimiter,
+ HiveEndPoint endPoint, HiveConf conf, char serdeSeparator, StreamingConnection conn)
+ throws ClassNotFoundException, ConnectionError, SerializationError,
+ InvalidColumn, StreamingException {
+ super(endPoint, conf, conn);
+ this.tableColumns = getCols(tbl);
+ this.serdeSeparator = serdeSeparator;
+ this.delimiter = delimiter;
+ this.fieldToColMapping = getFieldReordering(colNamesForFields, getTableColumns());
+ this.reorderingNeeded = isReorderingNeeded(delimiter, getTableColumns());
+ LOG.debug("Field reordering needed = " + this.reorderingNeeded + ", for endpoint " + endPoint);
+ this.serdeSeparator = serdeSeparator;
+ this.serde = createSerde(tbl, conf, serdeSeparator);
+
+ // get ObjInspectors for entire record and bucketed cols
+ try {
+ this.recordObjInspector = (LazySimpleStructObjectInspector) serde.getObjectInspector();
+ this.bucketObjInspectors = getObjectInspectorsForBucketedCols(bucketIds, recordObjInspector);
+ } catch (SerDeException e) {
+ throw new SerializationError("Unable to get ObjectInspector for bucket columns", e);
+ }
+
+ // get StructFields for bucketed cols
+ bucketStructFields = new StructField[bucketIds.size()];
+ List<? extends StructField> allFields = recordObjInspector.getAllStructFieldRefs();
+ for (int i = 0; i < bucketIds.size(); i++) {
+ bucketStructFields[i] = allFields.get(bucketIds.get(i));
+ }
+ }
+ /**
+ * @deprecated As of release 1.3/2.1. Replaced by {@link #DelimitedInputWriter(String[], String, HiveEndPoint, StreamingConnection)}
+ */
+ public DelimitedInputWriter(String[] colNamesForFields, String delimiter,
+ HiveEndPoint endPoint)
+ throws ClassNotFoundException, ConnectionError, SerializationError,
+ InvalidColumn, StreamingException {
+ this(colNamesForFields, delimiter, endPoint, null, null);
+ }
+ /**
+ * @deprecated As of release 1.3/2.1. Replaced by {@link #DelimitedInputWriter(String[], String, HiveEndPoint, HiveConf, StreamingConnection)}
+ */
+ public DelimitedInputWriter(String[] colNamesForFields, String delimiter,
+ HiveEndPoint endPoint, HiveConf conf)
+ throws ClassNotFoundException, ConnectionError, SerializationError,
+ InvalidColumn, StreamingException {
+ this(colNamesForFields, delimiter, endPoint, conf,
+ (char) LazySerDeParameters.DefaultSeparators[0], null);
+ }
+ /**
+ * @deprecated As of release 1.3/2.1. Replaced by {@link #DelimitedInputWriter(String[], String, HiveEndPoint, HiveConf, char, StreamingConnection)}
+ */
+ public DelimitedInputWriter(String[] colNamesForFields, String delimiter,
+ HiveEndPoint endPoint, HiveConf conf, char serdeSeparator)
+ throws ClassNotFoundException, StreamingException {
+ this(colNamesForFields, delimiter, endPoint, conf, serdeSeparator, null);
+ }
+
+ private boolean isReorderingNeeded(String delimiter, ArrayList<String> tableColumns) {
+ return !( delimiter.equals(String.valueOf(getSerdeSeparator()))
+ && areFieldsInColOrder(fieldToColMapping)
+ && tableColumns.size()>=fieldToColMapping.length );
+ }
+
+ private static boolean areFieldsInColOrder(int[] fieldToColMapping) {
+ for(int i=0; i<fieldToColMapping.length; ++i) {
+ if(fieldToColMapping[i]!=i) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @VisibleForTesting
+ static int[] getFieldReordering(String[] colNamesForFields, List<String> tableColNames)
+ throws InvalidColumn {
+ int[] result = new int[ colNamesForFields.length ];
+ for(int i=0; i<colNamesForFields.length; ++i) {
+ result[i] = -1;
+ }
+ int i=-1, fieldLabelCount=0;
+ for( String col : colNamesForFields ) {
+ ++i;
+ if(col == null) {
+ continue;
+ }
+ if( col.trim().isEmpty() ) {
+ continue;
+ }
+ ++fieldLabelCount;
+ int loc = tableColNames.indexOf(col);
+ if(loc == -1) {
+ throw new InvalidColumn("Column '" + col + "' not found in table for input field " + i+1);
+ }
+ result[i] = loc;
+ }
+ if(fieldLabelCount>tableColNames.size()) {
+ throw new InvalidColumn("Number of field names exceeds the number of columns in table");
+ }
+ return result;
+ }
+
+ // Reorder fields in record based on the order of columns in the table
+ protected byte[] reorderFields(byte[] record) throws UnsupportedEncodingException {
+ if(!reorderingNeeded) {
+ return record;
+ }
+ String[] reorderedFields = new String[getTableColumns().size()];
+ String decoded = new String(record);
+ String[] fields = decoded.split(delimiter,-1);
+ for (int i=0; i<fieldToColMapping.length; ++i) {
+ int newIndex = fieldToColMapping[i];
+ if(newIndex != -1) {
+ reorderedFields[newIndex] = fields[i];
+ }
+ }
+ return join(reorderedFields, getSerdeSeparator());
+ }
+
+ // handles nulls in items[]
+ // TODO: perhaps can be made more efficient by creating a byte[] directly
+ private static byte[] join(String[] items, char separator) {
+ StringBuilder buff = new StringBuilder(100);
+ if(items.length == 0)
+ return "".getBytes();
+ int i=0;
+ for(; i<items.length-1; ++i) {
+ if(items[i]!=null) {
+ buff.append(items[i]);
+ }
+ buff.append(separator);
+ }
+ if(items[i]!=null) {
+ buff.append(items[i]);
+ }
+ return buff.toString().getBytes();
+ }
+
+ protected ArrayList<String> getTableColumns() {
+ return tableColumns;
+ }
+
+ @Override
+ public void write(long writeId, byte[] record)
+ throws SerializationError, StreamingIOFailure {
+ try {
+ byte[] orderedFields = reorderFields(record);
+ Object encodedRow = encode(orderedFields);
+ int bucket = getBucket(encodedRow);
+ getRecordUpdater(bucket).insert(writeId, encodedRow);
+ } catch (IOException e) {
+ throw new StreamingIOFailure("Error writing record in transaction write id ("
+ + writeId + ")", e);
+ }
+ }
+
+ @Override
+ public AbstractSerDe getSerde() {
+ return serde;
+ }
+
+ protected LazySimpleStructObjectInspector getRecordObjectInspector() {
+ return recordObjInspector;
+ }
+
+ @Override
+ protected StructField[] getBucketStructFields() {
+ return bucketStructFields;
+ }
+
+ protected ObjectInspector[] getBucketObjectInspectors() {
+ return bucketObjInspectors;
+ }
+
+ @Override
+ public Object encode(byte[] record) throws SerializationError {
+ try {
+ BytesWritable blob = new BytesWritable();
+ blob.set(record, 0, record.length);
+ return serde.deserialize(blob);
+ } catch (SerDeException e) {
+ throw new SerializationError("Unable to convert byte[] record into Object", e);
+ }
+ }
+
+ /**
+ * Creates LazySimpleSerde
+ * @return
+ * @throws SerializationError if serde could not be initialized
+ * @param tbl
+ */
+ protected static LazySimpleSerDe createSerde(Table tbl, HiveConf conf, char serdeSeparator)
+ throws SerializationError {
+ try {
+ Properties tableProps = MetaStoreUtils.getTableMetadata(tbl);
+ tableProps.setProperty("field.delim", String.valueOf(serdeSeparator));
+ LazySimpleSerDe serde = new LazySimpleSerDe();
+ SerDeUtils.initializeSerDe(serde, conf, tableProps, null);
+ return serde;
+ } catch (SerDeException e) {
+ throw new SerializationError("Error initializing serde", e);
+ }
+ }
+
+ private ArrayList<String> getCols(Table table) {
+ List<FieldSchema> cols = table.getSd().getCols();
+ ArrayList<String> colNames = new ArrayList<String>(cols.size());
+ for (FieldSchema col : cols) {
+ colNames.add(col.getName().toLowerCase());
+ }
+ return colNames;
+ }
+
+ public char getSerdeSeparator() {
+ return serdeSeparator;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/6bd32a0d/streaming/src/java/org/apache/hive/streaming/HeartBeatFailure.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/HeartBeatFailure.java b/streaming/src/java/org/apache/hive/streaming/HeartBeatFailure.java
new file mode 100644
index 0000000..b1f9520
--- /dev/null
+++ b/streaming/src/java/org/apache/hive/streaming/HeartBeatFailure.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+import java.util.Collection;
+import java.util.Set;
+
+public class HeartBeatFailure extends StreamingException {
+ private Collection<Long> abortedTxns;
+ private Collection<Long> nosuchTxns;
+
+ public HeartBeatFailure(Collection<Long> abortedTxns, Set<Long> nosuchTxns) {
+ super("Heart beat error. InvalidTxns: " + nosuchTxns + ". AbortedTxns: " + abortedTxns);
+ this.abortedTxns = abortedTxns;
+ this.nosuchTxns = nosuchTxns;
+ }
+}
[2/4] hive git commit: HIVE-19210: Create separate module for
streaming ingest (Prasanth Jayachandran reviewed by Eugene Koifman)
Posted by pr...@apache.org.
http://git-wip-us.apache.org/repos/asf/hive/blob/6bd32a0d/streaming/src/java/org/apache/hive/streaming/package.html
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/package.html b/streaming/src/java/org/apache/hive/streaming/package.html
new file mode 100644
index 0000000..2b45792
--- /dev/null
+++ b/streaming/src/java/org/apache/hive/streaming/package.html
@@ -0,0 +1,181 @@
+<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN"
+ "http://www.w3.org/TR/html4/loose.dtd">
+
+<html lang="en">
+
+<head>
+<meta name=Title content="HCatalog Streaming API">
+<meta name=Keywords content="HCatalog Streaming ACID">
+<meta http-equiv=Content-Type content="text/html; charset=macintosh">
+<title>HCatalog Streaming API</title>
+</head>
+
+<body>
+
+<h1>HCatalog Streaming API -- high level description</h1>
+
+<b>NOTE: The Streaming API feature is provided as a technology
+preview. The API may undergo incompatible changes in upcoming
+releases.</b>
+
+<p>
+Traditionally adding new data into hive requires gathering a large
+amount of data onto HDFS and then periodically adding a new
+partition. This is essentially a <i>batch insertion</i>. Insertion of
+new data into an existing partition or table is not done in a way that
+gives consistent results to readers. Hive Streaming API allows data to
+be pumped continuously into Hive. The incoming data can be
+continuously committed in small batches (of records) into a Hive
+partition. Once data is committed it becomes immediately visible to
+all Hive queries initiated subsequently.</p>
+
+<p>
+This API is intended for streaming clients such as NiFi, Flume and Storm,
+which continuously generate data. Streaming support is built on top of
+ACID based insert/update support in Hive.</p>
+
+<p>
+The classes and interfaces part of the Hive streaming API are broadly
+categorized into two. The first set provides support for connection
+and transaction management while the second set provides I/O
+support. Transactions are managed by the Hive MetaStore. Writes are
+performed to HDFS via Hive wrapper APIs that bypass MetaStore. </p>
+
+<p>
+<b>Note on packaging</b>: The APIs are defined in the
+<b>org.apache.hive.streaming</b> Java package and included as
+the hive-streaming jar.</p>
+
+<h2>STREAMING REQUIREMENTS</h2>
+
+<p>
+A few things are currently required to use streaming.
+</p>
+
+<p>
+<ol>
+ <li> Currently, only ORC storage format is supported. So
+ '<b>stored as orc</b>' must be specified during table creation.</li>
+ <li> The hive table may be bucketed but must not be sorted. </li>
+ <li> User of the client streaming process must have the necessary
+ permissions to write to the table or partition and create partitions in
+ the table.</li>
+ <li> Currently, when issuing queries on streaming tables, query client must set
+ <ol>
+ <li><b>hive.input.format =
+ org.apache.hadoop.hive.ql.io.HiveInputFormat</b></li>
+ </ol></li>
+ The above client settings are a temporary requirement and the intention is to
+ drop the need for them in the near future.
+ <li> Settings required in hive-site.xml for Metastore:
+ <ol>
+ <li><b>hive.txn.manager =
+ org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</b></li>
+ <li><b>hive.support.concurrency = true </b> </li>
+ <li><b>hive.compactor.initiator.on = true</b> </li>
+ <li><b>hive.compactor.worker.threads > 0 </b> </li>
+ </ol></li>
+</ol></p>
+
+<p>
+<b>Note:</b> Streaming to <b>unpartitioned</b> tables is also
+supported.</p>
+
+<h2>Transaction and Connection management</h2>
+
+<p>
+The class <a href="HiveEndPoint.html"><b>HiveEndPoint</b></a> is a Hive end
+point to connect to. An endpoint is either a Hive table or
+partition. An endpoint is cheap to create and does not internally hold
+on to any network connections. Invoking the newConnection method on
+it creates a new connection to the Hive MetaStore for streaming
+purposes. It returns a
+<a href="StreamingConnection.html"><b>StreamingConnection</b></a>
+object. Multiple connections can be established on the same
+endpoint. StreamingConnection can then be used to initiate new
+transactions for performing I/O. </p>
+
+<h3>Dynamic Partition Creation:</h3> It is very likely that a setup in
+which data is being streamed continuously (e.g. Flume), it is
+desirable to have new partitions created automatically (say on a
+hourly basis). In such cases requiring the Hive admin to pre-create
+the necessary partitions may not be reasonable. Consequently the
+streaming API allows streaming clients to create partitions as
+needed. <b>HiveEndPoind.newConnection()</b> accepts a argument to
+indicate if the partition should be auto created. Partition creation
+being an atomic action, multiple clients can race to create the
+partition, but only one would succeed, so streaming clients need not
+synchronize when creating a partition. The user of the client process
+needs to be given write permissions on the Hive table in order to
+create partitions.
+
+<h3>Batching Transactions:</h3> Transactions are implemented slightly
+differently than traditional database systems. Multiple transactions
+are grouped into a <i>Transaction Batch</i> and each transaction has
+an id. Data from each transaction batch gets a single file on HDFS,
+which eventually gets compacted with other files into a larger file
+automatically for efficiency.
+
+<h3>Basic Steps:</h3> After connection is established, a streaming
+client first requests for a new batch of transactions. In response it
+receives a set of transaction ids that are part of the transaction
+batch. Subsequently the client proceeds to consume one transaction at
+a time by initiating new transactions. Client will write() one or more
+records per transactions and either commit or abort the current
+transaction before switching to the next one. Each
+<b>TransactionBatch.write()</b> invocation automatically associates
+the I/O attempt with the current transaction id. The user of the
+streaming client needs to have write permissions to the partition or
+table.</p>
+
+<p>
+<b>Concurrency Note:</b> I/O can be performed on multiple
+<b>TransactionBatch</b>s concurrently. However the transactions within a
+transaction batch much be consumed sequentially.</p>
+
+<h2>Writing Data</h2>
+
+<p>
+These classes and interfaces provide support for writing the data to
+Hive within a transaction.
+<a href="RecordWriter.html"><b>RecordWriter</b></a> is the interface
+implemented by all writers. A writer is responsible for taking a
+record in the form of a <b>byte[]</b> containing data in a known
+format (e.g. CSV) and writing it out in the format supported by Hive
+streaming. A <b>RecordWriter</b> may reorder or drop fields from the incoming
+record if necessary to map them to the corresponding columns in the
+Hive Table. A streaming client will instantiate an appropriate
+<b>RecordWriter</b> type and pass it to
+<b>StreamingConnection.fetchTransactionBatch()</b>. The streaming client
+does not directly interact with the <b>RecordWriter</b> therafter, but
+relies on the <b>TransactionBatch</b> to do so.</p>
+
+<p>
+Currently, out of the box, the streaming API provides two
+implementations of the <b>RecordWriter</b> interface. One handles delimited
+input data (such as CSV, tab separated, etc. and the other for JSON
+(strict syntax). Support for other input formats can be provided by
+additional implementations of the <b>RecordWriter</b> interface.
+<ul>
+<li> <a href="DelimitedInputWriter.html"><b>DelimitedInputWriter</b></a>
+- Delimited text input.</li>
+<li> <a href="StrictJsonWriter.html"><b>StrictJsonWriter</b></a>
+- JSON text input.</li>
+ <li> <a href="StrictRegexWriter.html"><b>StrictRegexWriter</b></a>
+ - text input with regex.</li>
+</ul></p>
+
+<h2>Performance, Concurrency, Etc.</h2>
+<p>
+ Each StreamingConnection is writing data at the rate the underlying
+ FileSystem can accept it. If that is not sufficient, multiple StreamingConnection objects can
+ be created concurrently.
+</p>
+<p>
+ Each StreamingConnection can have at most 1 outstanding TransactionBatch and each TransactionBatch
+ may have at most 2 threads operaing on it.
+ See <a href="TransactionBatch.html"><b>TransactionBatch</b></a>
+</p>
+</body>
+
+</html>
http://git-wip-us.apache.org/repos/asf/hive/blob/6bd32a0d/streaming/src/test/org/apache/hive/streaming/TestDelimitedInputWriter.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/org/apache/hive/streaming/TestDelimitedInputWriter.java b/streaming/src/test/org/apache/hive/streaming/TestDelimitedInputWriter.java
new file mode 100644
index 0000000..f0843a1
--- /dev/null
+++ b/streaming/src/test/org/apache/hive/streaming/TestDelimitedInputWriter.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+import junit.framework.Assert;
+
+public class TestDelimitedInputWriter {
+ @Test
+ public void testFieldReordering() throws Exception {
+
+ ArrayList<String> colNames = Lists.newArrayList(new String[]{"col1", "col2", "col3", "col4", "col5"});
+ {//1) test dropping fields - first middle & last
+ String[] fieldNames = {null, "col2", null, "col4", null};
+ int[] mapping = DelimitedInputWriter.getFieldReordering(fieldNames, colNames);
+ Assert.assertTrue(Arrays.equals(mapping, new int[]{-1, 1, -1, 3, -1}));
+ }
+
+ {//2) test reordering
+ String[] fieldNames = {"col5", "col4", "col3", "col2", "col1"};
+ int[] mapping = DelimitedInputWriter.getFieldReordering(fieldNames, colNames);
+ Assert.assertTrue( Arrays.equals(mapping, new int[]{4,3,2,1,0}) );
+ }
+
+ {//3) test bad field names
+ String[] fieldNames = {"xyz", "abc", "col3", "col4", "as"};
+ try {
+ DelimitedInputWriter.getFieldReordering(fieldNames, colNames);
+ Assert.fail();
+ } catch (InvalidColumn e) {
+ // should throw
+ }
+ }
+
+ {//4) test few field names
+ String[] fieldNames = {"col3", "col4"};
+ int[] mapping = DelimitedInputWriter.getFieldReordering(fieldNames, colNames);
+ Assert.assertTrue( Arrays.equals(mapping, new int[]{2,3}) );
+ }
+
+ {//5) test extra field names
+ String[] fieldNames = {"col5", "col4", "col3", "col2", "col1", "col1"};
+ try {
+ DelimitedInputWriter.getFieldReordering(fieldNames, colNames);
+ Assert.fail();
+ } catch (InvalidColumn e) {
+ //show throw
+ }
+ }
+ }
+}
[3/4] hive git commit: HIVE-19210: Create separate module for
streaming ingest (Prasanth Jayachandran reviewed by Eugene Koifman)
Posted by pr...@apache.org.
http://git-wip-us.apache.org/repos/asf/hive/blob/6bd32a0d/streaming/src/java/org/apache/hive/streaming/HiveEndPoint.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/HiveEndPoint.java b/streaming/src/java/org/apache/hive/streaming/HiveEndPoint.java
new file mode 100644
index 0000000..b04e137
--- /dev/null
+++ b/streaming/src/java/org/apache/hive/streaming/HiveEndPoint.java
@@ -0,0 +1,1117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.LockComponentBuilder;
+import org.apache.hadoop.hive.metastore.LockRequestBuilder;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.DataOperationType;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.LockState;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
+import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
+import org.apache.hadoop.hive.ql.DriverFactory;
+import org.apache.hadoop.hive.ql.IDriver;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hive.hcatalog.common.HCatUtil;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Information about the hive end point (i.e. table or partition) to write to.
+ * A light weight object that does NOT internally hold on to resources such as
+ * network connections. It can be stored in Hashed containers such as sets and hash tables.
+ */
+public class HiveEndPoint {
+ public final String metaStoreUri;
+ public final String database;
+ public final String table;
+ public final ArrayList<String> partitionVals;
+
+
+ static final private Logger LOG = LoggerFactory.getLogger(HiveEndPoint.class.getName());
+
+ /**
+ *
+ * @param metaStoreUri URI of the metastore to connect to eg: thrift://localhost:9083
+ * @param database Name of the Hive database
+ * @param table Name of table to stream to
+ * @param partitionVals Indicates the specific partition to stream to. Can be null or empty List
+ * if streaming to a table without partitions. The order of values in this
+ * list must correspond exactly to the order of partition columns specified
+ * during the table creation. E.g. For a table partitioned by
+ * (continent string, country string), partitionVals could be the list
+ * ("Asia", "India").
+ */
+ public HiveEndPoint(String metaStoreUri
+ , String database, String table, List<String> partitionVals) {
+ this.metaStoreUri = metaStoreUri;
+ if (database==null) {
+ throw new IllegalArgumentException("Database cannot be null for HiveEndPoint");
+ }
+ this.database = database;
+ this.table = table;
+ if (table==null) {
+ throw new IllegalArgumentException("Table cannot be null for HiveEndPoint");
+ }
+ this.partitionVals = partitionVals==null ? new ArrayList<String>()
+ : new ArrayList<String>( partitionVals );
+ }
+
+
+ /**
+ * @deprecated As of release 1.3/2.1. Replaced by {@link #newConnection(boolean, String)}
+ */
+ @Deprecated
+ public StreamingConnection newConnection(final boolean createPartIfNotExists)
+ throws ConnectionError, InvalidPartition, InvalidTable, PartitionCreationFailed
+ , ImpersonationFailed , InterruptedException {
+ return newConnection(createPartIfNotExists, null, null, null);
+ }
+ /**
+ * @deprecated As of release 1.3/2.1. Replaced by {@link #newConnection(boolean, HiveConf, String)}
+ */
+ @Deprecated
+ public StreamingConnection newConnection(final boolean createPartIfNotExists, HiveConf conf)
+ throws ConnectionError, InvalidPartition, InvalidTable, PartitionCreationFailed
+ , ImpersonationFailed , InterruptedException {
+ return newConnection(createPartIfNotExists, conf, null, null);
+ }
+ /**
+ * @deprecated As of release 1.3/2.1. Replaced by {@link #newConnection(boolean, HiveConf, UserGroupInformation, String)}
+ */
+ @Deprecated
+ public StreamingConnection newConnection(final boolean createPartIfNotExists, final HiveConf conf,
+ final UserGroupInformation authenticatedUser)
+ throws ConnectionError, InvalidPartition,
+ InvalidTable, PartitionCreationFailed, ImpersonationFailed , InterruptedException {
+ return newConnection(createPartIfNotExists, conf, authenticatedUser, null);
+ }
+ /**
+ * Acquire a new connection to MetaStore for streaming
+ * @param createPartIfNotExists If true, the partition specified in the endpoint
+ * will be auto created if it does not exist
+ * @param agentInfo should uniquely identify the process/entity that is using this batch. This
+ * should be something that can be correlated with calling application log files
+ * and/or monitoring consoles.
+ * @return
+ * @throws ConnectionError if problem connecting
+ * @throws InvalidPartition if specified partition is not valid (createPartIfNotExists = false)
+ * @throws ImpersonationFailed if not able to impersonate 'proxyUser'
+ * @throws PartitionCreationFailed if failed to create partition
+ * @throws InterruptedException
+ */
+ public StreamingConnection newConnection(final boolean createPartIfNotExists, String agentInfo)
+ throws ConnectionError, InvalidPartition, InvalidTable, PartitionCreationFailed
+ , ImpersonationFailed , InterruptedException {
+ return newConnection(createPartIfNotExists, null, null, agentInfo);
+ }
+
+ /**
+ * Acquire a new connection to MetaStore for streaming
+ * @param createPartIfNotExists If true, the partition specified in the endpoint
+ * will be auto created if it does not exist
+ * @param conf HiveConf object, set it to null if not using advanced hive settings.
+ * @param agentInfo should uniquely identify the process/entity that is using this batch. This
+ * should be something that can be correlated with calling application log files
+ * and/or monitoring consoles.
+ * @return
+ * @throws ConnectionError if problem connecting
+ * @throws InvalidPartition if specified partition is not valid (createPartIfNotExists = false)
+ * @throws ImpersonationFailed if not able to impersonate 'proxyUser'
+ * @throws PartitionCreationFailed if failed to create partition
+ * @throws InterruptedException
+ */
+ public StreamingConnection newConnection(final boolean createPartIfNotExists, HiveConf conf, String agentInfo)
+ throws ConnectionError, InvalidPartition, InvalidTable, PartitionCreationFailed
+ , ImpersonationFailed , InterruptedException {
+ return newConnection(createPartIfNotExists, conf, null, agentInfo);
+ }
+
+ /**
+ * Acquire a new connection to MetaStore for streaming. To connect using Kerberos,
+ * 'authenticatedUser' argument should have been used to do a kerberos login. Additionally the
+ * 'hive.metastore.kerberos.principal' setting should be set correctly either in hive-site.xml or
+ * in the 'conf' argument (if not null). If using hive-site.xml, it should be in classpath.
+ *
+ * @param createPartIfNotExists If true, the partition specified in the endpoint
+ * will be auto created if it does not exist
+ * @param conf HiveConf object to be used for the connection. Can be null.
+ * @param authenticatedUser UserGroupInformation object obtained from successful authentication.
+ * Uses non-secure mode if this argument is null.
+ * @param agentInfo should uniquely identify the process/entity that is using this batch. This
+ * should be something that can be correlated with calling application log files
+ * and/or monitoring consoles.
+ * @return
+ * @throws ConnectionError if there is a connection problem
+ * @throws InvalidPartition if specified partition is not valid (createPartIfNotExists = false)
+ * @throws ImpersonationFailed if not able to impersonate 'username'
+ * @throws PartitionCreationFailed if failed to create partition
+ * @throws InterruptedException
+ */
+ public StreamingConnection newConnection(final boolean createPartIfNotExists, final HiveConf conf,
+ final UserGroupInformation authenticatedUser, final String agentInfo)
+ throws ConnectionError, InvalidPartition,
+ InvalidTable, PartitionCreationFailed, ImpersonationFailed , InterruptedException {
+
+ if( authenticatedUser==null ) {
+ return newConnectionImpl(authenticatedUser, createPartIfNotExists, conf, agentInfo);
+ }
+
+ try {
+ return authenticatedUser.doAs (
+ new PrivilegedExceptionAction<StreamingConnection>() {
+ @Override
+ public StreamingConnection run()
+ throws ConnectionError, InvalidPartition, InvalidTable
+ , PartitionCreationFailed {
+ return newConnectionImpl(authenticatedUser, createPartIfNotExists, conf, agentInfo);
+ }
+ }
+ );
+ } catch (IOException e) {
+ throw new ConnectionError("Failed to connect as : " + authenticatedUser.getShortUserName(), e);
+ }
+ }
+
+ private StreamingConnection newConnectionImpl(UserGroupInformation ugi,
+ boolean createPartIfNotExists, HiveConf conf, String agentInfo)
+ throws ConnectionError, InvalidPartition, InvalidTable
+ , PartitionCreationFailed {
+ return new ConnectionImpl(this, ugi, conf, createPartIfNotExists, agentInfo);
+ }
+
+ private static UserGroupInformation getUserGroupInfo(String user)
+ throws ImpersonationFailed {
+ try {
+ return UserGroupInformation.createProxyUser(
+ user, UserGroupInformation.getLoginUser());
+ } catch (IOException e) {
+ LOG.error("Unable to get UserGroupInfo for user : " + user, e);
+ throw new ImpersonationFailed(user,e);
+ }
+ }
+
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ HiveEndPoint endPoint = (HiveEndPoint) o;
+
+ if (database != null
+ ? !database.equals(endPoint.database)
+ : endPoint.database != null ) {
+ return false;
+ }
+ if (metaStoreUri != null
+ ? !metaStoreUri.equals(endPoint.metaStoreUri)
+ : endPoint.metaStoreUri != null ) {
+ return false;
+ }
+ if (!partitionVals.equals(endPoint.partitionVals)) {
+ return false;
+ }
+ if (table != null ? !table.equals(endPoint.table) : endPoint.table != null) {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = metaStoreUri != null ? metaStoreUri.hashCode() : 0;
+ result = 31 * result + (database != null ? database.hashCode() : 0);
+ result = 31 * result + (table != null ? table.hashCode() : 0);
+ result = 31 * result + partitionVals.hashCode();
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "{" +
+ "metaStoreUri='" + metaStoreUri + '\'' +
+ ", database='" + database + '\'' +
+ ", table='" + table + '\'' +
+ ", partitionVals=" + partitionVals + " }";
+ }
+
+
+ private static class ConnectionImpl implements StreamingConnection {
+ private final IMetaStoreClient msClient;
+ private final IMetaStoreClient heartbeaterMSClient;
+ private final HiveEndPoint endPt;
+ private final UserGroupInformation ugi;
+ private final String username;
+ private final boolean secureMode;
+ private final String agentInfo;
+
+ /**
+ * @param endPoint end point to connect to
+ * @param ugi on behalf of whom streaming is done. cannot be null
+ * @param conf HiveConf object
+ * @param createPart create the partition if it does not exist
+ * @throws ConnectionError if there is trouble connecting
+ * @throws InvalidPartition if specified partition does not exist (and createPart=false)
+ * @throws InvalidTable if specified table does not exist
+ * @throws PartitionCreationFailed if createPart=true and not able to create partition
+ */
+ private ConnectionImpl(HiveEndPoint endPoint, UserGroupInformation ugi,
+ HiveConf conf, boolean createPart, String agentInfo)
+ throws ConnectionError, InvalidPartition, InvalidTable
+ , PartitionCreationFailed {
+ this.endPt = endPoint;
+ this.ugi = ugi;
+ this.agentInfo = agentInfo;
+ this.username = ugi==null ? System.getProperty("user.name") : ugi.getShortUserName();
+ if (conf==null) {
+ conf = HiveEndPoint.createHiveConf(this.getClass(), endPoint.metaStoreUri);
+ }
+ else {
+ overrideConfSettings(conf);
+ }
+ this.secureMode = ugi==null ? false : ugi.hasKerberosCredentials();
+ this.msClient = getMetaStoreClient(endPoint, conf, secureMode);
+ // We use a separate metastore client for heartbeat calls to ensure heartbeat RPC calls are
+ // isolated from the other transaction related RPC calls.
+ this.heartbeaterMSClient = getMetaStoreClient(endPoint, conf, secureMode);
+ checkEndPoint(endPoint, msClient);
+ if (createPart && !endPoint.partitionVals.isEmpty()) {
+ createPartitionIfNotExists(endPoint, msClient, conf);
+ }
+ }
+
+ /**
+ * Checks the validity of endpoint
+ * @param endPoint the HiveEndPoint to be checked
+ * @param msClient the metastore client
+ * @throws InvalidTable
+ */
+ private void checkEndPoint(HiveEndPoint endPoint, IMetaStoreClient msClient)
+ throws InvalidTable, ConnectionError {
+ Table t;
+ try {
+ t = msClient.getTable(endPoint.database, endPoint.table);
+ } catch (Exception e) {
+ LOG.warn("Unable to check the endPoint: " + endPoint, e);
+ throw new InvalidTable(endPoint.database, endPoint.table, e);
+ }
+ // 1 - check that the table is Acid
+ if (!AcidUtils.isFullAcidTable(t)) {
+ LOG.error("HiveEndPoint " + endPoint + " must use an acid table");
+ throw new InvalidTable(endPoint.database, endPoint.table, "is not an Acid table");
+ }
+
+ // 2 - check if partitionvals are legitimate
+ if (t.getPartitionKeys() != null && !t.getPartitionKeys().isEmpty()
+ && endPoint.partitionVals.isEmpty()) {
+ // Invalid if table is partitioned, but endPoint's partitionVals is empty
+ String errMsg = "HiveEndPoint " + endPoint + " doesn't specify any partitions for " +
+ "partitioned table";
+ LOG.error(errMsg);
+ throw new ConnectionError(errMsg);
+ }
+ if ((t.getPartitionKeys() == null || t.getPartitionKeys().isEmpty())
+ && !endPoint.partitionVals.isEmpty()) {
+ // Invalid if table is not partitioned, but endPoint's partitionVals is not empty
+ String errMsg = "HiveEndPoint" + endPoint + " specifies partitions for unpartitioned table";
+ LOG.error(errMsg);
+ throw new ConnectionError(errMsg);
+ }
+ }
+
+ /**
+ * Close connection
+ */
+ @Override
+ public void close() {
+ if (ugi==null) {
+ msClient.close();
+ heartbeaterMSClient.close();
+ return;
+ }
+ try {
+ ugi.doAs (
+ new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ msClient.close();
+ heartbeaterMSClient.close();
+ return null;
+ }
+ } );
+ try {
+ FileSystem.closeAllForUGI(ugi);
+ } catch (IOException exception) {
+ LOG.error("Could not clean up file-system handles for UGI: " + ugi, exception);
+ }
+ } catch (IOException e) {
+ LOG.error("Error closing connection to " + endPt, e);
+ } catch (InterruptedException e) {
+ LOG.error("Interrupted when closing connection to " + endPt, e);
+ }
+ }
+
+ @Override
+ public UserGroupInformation getUserGroupInformation() {
+ return ugi;
+ }
+
+ /**
+ * Acquires a new batch of transactions from Hive.
+ *
+ * @param numTransactions is a hint from client indicating how many transactions client needs.
+ * @param recordWriter Used to write record. The same writer instance can
+ * be shared with another TransactionBatch (to the same endpoint)
+ * only after the first TransactionBatch has been closed.
+ * Writer will be closed when the TransactionBatch is closed.
+ * @return
+ * @throws StreamingIOFailure if failed to create new RecordUpdater for batch
+ * @throws TransactionBatchUnAvailable if failed to acquire a new Transaction batch
+ * @throws ImpersonationFailed failed to run command as proxyUser
+ * @throws InterruptedException
+ */
+ @Override
+ public TransactionBatch fetchTransactionBatch(final int numTransactions,
+ final RecordWriter recordWriter)
+ throws StreamingException, TransactionBatchUnAvailable, ImpersonationFailed
+ , InterruptedException {
+ if (ugi==null) {
+ return fetchTransactionBatchImpl(numTransactions, recordWriter);
+ }
+ try {
+ return ugi.doAs (
+ new PrivilegedExceptionAction<TransactionBatch>() {
+ @Override
+ public TransactionBatch run() throws StreamingException, InterruptedException {
+ return fetchTransactionBatchImpl(numTransactions, recordWriter);
+ }
+ }
+ );
+ } catch (IOException e) {
+ throw new ImpersonationFailed("Failed to fetch Txn Batch as user '" + ugi.getShortUserName()
+ + "' when acquiring Transaction Batch on endPoint " + endPt, e);
+ }
+ }
+
+ private TransactionBatch fetchTransactionBatchImpl(int numTransactions,
+ RecordWriter recordWriter)
+ throws StreamingException, TransactionBatchUnAvailable, InterruptedException {
+ return new TransactionBatchImpl(username, ugi, endPt, numTransactions, msClient,
+ heartbeaterMSClient, recordWriter, agentInfo);
+ }
+
+
+ private static void createPartitionIfNotExists(HiveEndPoint ep,
+ IMetaStoreClient msClient, HiveConf conf)
+ throws InvalidTable, PartitionCreationFailed {
+ if (ep.partitionVals.isEmpty()) {
+ return;
+ }
+ SessionState localSession = null;
+ if(SessionState.get() == null) {
+ localSession = SessionState.start(new CliSessionState(conf));
+ }
+ IDriver driver = DriverFactory.newDriver(conf);
+
+ try {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Attempting to create partition (if not existent) " + ep);
+ }
+
+ List<FieldSchema> partKeys = msClient.getTable(ep.database, ep.table)
+ .getPartitionKeys();
+ runDDL(driver, "use " + ep.database);
+ String query = "alter table " + ep.table + " add if not exists partition "
+ + partSpecStr(partKeys, ep.partitionVals);
+ runDDL(driver, query);
+ } catch (MetaException e) {
+ LOG.error("Failed to create partition : " + ep, e);
+ throw new PartitionCreationFailed(ep, e);
+ } catch (NoSuchObjectException e) {
+ LOG.error("Failed to create partition : " + ep, e);
+ throw new InvalidTable(ep.database, ep.table);
+ } catch (TException e) {
+ LOG.error("Failed to create partition : " + ep, e);
+ throw new PartitionCreationFailed(ep, e);
+ } catch (QueryFailedException e) {
+ LOG.error("Failed to create partition : " + ep, e);
+ throw new PartitionCreationFailed(ep, e);
+ } finally {
+ driver.close();
+ try {
+ if(localSession != null) {
+ localSession.close();
+ }
+ } catch (IOException e) {
+ LOG.warn("Error closing SessionState used to run Hive DDL.");
+ }
+ }
+ }
+
+ private static boolean runDDL(IDriver driver, String sql) throws QueryFailedException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Running Hive Query: " + sql);
+ }
+ driver.run(sql);
+ return true;
+ }
+
+ private static String partSpecStr(List<FieldSchema> partKeys, ArrayList<String> partVals) {
+ if (partKeys.size()!=partVals.size()) {
+ throw new IllegalArgumentException("Partition values:" + partVals +
+ ", does not match the partition Keys in table :" + partKeys );
+ }
+ StringBuilder buff = new StringBuilder(partKeys.size()*20);
+ buff.append(" ( ");
+ int i=0;
+ for (FieldSchema schema : partKeys) {
+ buff.append(schema.getName());
+ buff.append("='");
+ buff.append(partVals.get(i));
+ buff.append("'");
+ if (i!=partKeys.size()-1) {
+ buff.append(",");
+ }
+ ++i;
+ }
+ buff.append(" )");
+ return buff.toString();
+ }
+
+ private static IMetaStoreClient getMetaStoreClient(HiveEndPoint endPoint, HiveConf conf, boolean secureMode)
+ throws ConnectionError {
+
+ if (endPoint.metaStoreUri!= null) {
+ conf.setVar(HiveConf.ConfVars.METASTOREURIS, endPoint.metaStoreUri);
+ }
+ if(secureMode) {
+ conf.setBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL,true);
+ }
+ try {
+ return HCatUtil.getHiveMetastoreClient(conf);
+ } catch (MetaException e) {
+ throw new ConnectionError("Error connecting to Hive Metastore URI: "
+ + endPoint.metaStoreUri + ". " + e.getMessage(), e);
+ } catch (IOException e) {
+ throw new ConnectionError("Error connecting to Hive Metastore URI: "
+ + endPoint.metaStoreUri + ". " + e.getMessage(), e);
+ }
+ }
+ } // class ConnectionImpl
+
+ private static class TransactionBatchImpl implements TransactionBatch {
+ private final String username;
+ private final UserGroupInformation ugi;
+ private final HiveEndPoint endPt;
+ private final IMetaStoreClient msClient;
+ private final IMetaStoreClient heartbeaterMSClient;
+ private final RecordWriter recordWriter;
+ private final List<TxnToWriteId> txnToWriteIds;
+
+ //volatile because heartbeat() may be in a "different" thread; updates of this are "piggybacking"
+ private volatile int currentTxnIndex = -1;
+ private final String partNameForLock;
+ //volatile because heartbeat() may be in a "different" thread
+ private volatile TxnState state;
+ private LockRequest lockRequest = null;
+ /**
+ * once any operation on this batch encounters a system exception
+ * (e.g. IOException on write) it's safest to assume that we can't write to the
+ * file backing this batch any more. This guards important public methods
+ */
+ private volatile boolean isClosed = false;
+ private final String agentInfo;
+ /**
+ * Tracks the state of each transaction
+ */
+ private final TxnState[] txnStatus;
+ /**
+ * ID of the last txn used by {@link #beginNextTransactionImpl()}
+ */
+ private long lastTxnUsed;
+
+ /**
+ * Represents a batch of transactions acquired from MetaStore
+ *
+ * @throws StreamingException if failed to create new RecordUpdater for batch
+ * @throws TransactionBatchUnAvailable if failed to acquire a new Transaction batch
+ */
+ private TransactionBatchImpl(final String user, UserGroupInformation ugi, HiveEndPoint endPt,
+ final int numTxns, final IMetaStoreClient msClient,
+ final IMetaStoreClient heartbeaterMSClient, RecordWriter recordWriter, String agentInfo)
+ throws StreamingException, TransactionBatchUnAvailable, InterruptedException {
+ boolean success = false;
+ try {
+ if ( endPt.partitionVals!=null && !endPt.partitionVals.isEmpty() ) {
+ Table tableObj = msClient.getTable(endPt.database, endPt.table);
+ List<FieldSchema> partKeys = tableObj.getPartitionKeys();
+ partNameForLock = Warehouse.makePartName(partKeys, endPt.partitionVals);
+ } else {
+ partNameForLock = null;
+ }
+ this.username = user;
+ this.ugi = ugi;
+ this.endPt = endPt;
+ this.msClient = msClient;
+ this.heartbeaterMSClient = heartbeaterMSClient;
+ this.recordWriter = recordWriter;
+ this.agentInfo = agentInfo;
+
+ List<Long> txnIds = openTxnImpl(msClient, user, numTxns, ugi);
+ txnToWriteIds = allocateWriteIdsImpl(msClient, txnIds, ugi);
+ assert(txnToWriteIds.size() == numTxns);
+
+ txnStatus = new TxnState[numTxns];
+ for(int i = 0; i < txnStatus.length; i++) {
+ assert(txnToWriteIds.get(i).getTxnId() == txnIds.get(i));
+ txnStatus[i] = TxnState.OPEN;//Open matches Metastore state
+ }
+ this.state = TxnState.INACTIVE;
+
+ // The Write Ids returned for the transaction batch is also sequential
+ recordWriter.newBatch(txnToWriteIds.get(0).getWriteId(), txnToWriteIds.get(numTxns-1).getWriteId());
+ success = true;
+ } catch (TException e) {
+ throw new TransactionBatchUnAvailable(endPt, e);
+ } catch (IOException e) {
+ throw new TransactionBatchUnAvailable(endPt, e);
+ }
+ finally {
+ //clean up if above throws
+ markDead(success);
+ }
+ }
+
+ private List<Long> openTxnImpl(final IMetaStoreClient msClient, final String user, final int numTxns, UserGroupInformation ugi)
+ throws IOException, TException, InterruptedException {
+ if(ugi==null) {
+ return msClient.openTxns(user, numTxns).getTxn_ids();
+ }
+ return (List<Long>) ugi.doAs(new PrivilegedExceptionAction<Object>() {
+ @Override
+ public Object run() throws Exception {
+ return msClient.openTxns(user, numTxns).getTxn_ids();
+ }
+ });
+ }
+
+ private List<TxnToWriteId> allocateWriteIdsImpl(final IMetaStoreClient msClient,
+ final List<Long> txnIds, UserGroupInformation ugi)
+ throws IOException, TException, InterruptedException {
+ if(ugi==null) {
+ return msClient.allocateTableWriteIdsBatch(txnIds, endPt.database, endPt.table);
+ }
+ return (List<TxnToWriteId>) ugi.doAs(new PrivilegedExceptionAction<Object>() {
+ @Override
+ public Object run() throws Exception {
+ return msClient.allocateTableWriteIdsBatch(txnIds, endPt.database, endPt.table);
+ }
+ });
+ }
+
+ @Override
+ public String toString() {
+ if (txnToWriteIds==null || txnToWriteIds.isEmpty()) {
+ return "{}";
+ }
+ StringBuilder sb = new StringBuilder(" TxnStatus[");
+ for(TxnState state : txnStatus) {
+ //'state' should not be null - future proofing
+ sb.append(state == null ? "N" : state);
+ }
+ sb.append("] LastUsed ").append(JavaUtils.txnIdToString(lastTxnUsed));
+ return "TxnId/WriteIds=[" + txnToWriteIds.get(0).getTxnId()
+ + "/" + txnToWriteIds.get(0).getWriteId()
+ + "..."
+ + txnToWriteIds.get(txnToWriteIds.size()-1).getTxnId()
+ + "/" + txnToWriteIds.get(txnToWriteIds.size()-1).getWriteId()
+ + "] on endPoint = " + endPt + "; " + sb;
+ }
+
+ /**
+ * Activate the next available transaction in the current transaction batch
+ * @throws TransactionError failed to switch to next transaction
+ */
+ @Override
+ public void beginNextTransaction() throws TransactionError, ImpersonationFailed,
+ InterruptedException {
+ checkIsClosed();
+ if (ugi==null) {
+ beginNextTransactionImpl();
+ return;
+ }
+ try {
+ ugi.doAs (
+ new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws TransactionError {
+ beginNextTransactionImpl();
+ return null;
+ }
+ }
+ );
+ } catch (IOException e) {
+ throw new ImpersonationFailed("Failed switching to next Txn as user '" + username +
+ "' in Txn batch :" + this, e);
+ }
+ }
+
+ private void beginNextTransactionImpl() throws TransactionError {
+ state = TxnState.INACTIVE;//clear state from previous txn
+
+ if ((currentTxnIndex + 1) >= txnToWriteIds.size()) {
+ throw new InvalidTrasactionState("No more transactions available in" +
+ " current batch for end point : " + endPt);
+ }
+ ++currentTxnIndex;
+ state = TxnState.OPEN;
+ lastTxnUsed = getCurrentTxnId();
+ lockRequest = createLockRequest(endPt, partNameForLock, username, getCurrentTxnId(), agentInfo);
+ try {
+ LockResponse res = msClient.lock(lockRequest);
+ if (res.getState() != LockState.ACQUIRED) {
+ throw new TransactionError("Unable to acquire lock on " + endPt);
+ }
+ } catch (TException e) {
+ throw new TransactionError("Unable to acquire lock on " + endPt, e);
+ }
+ }
+
+ /**
+ * Get Id of currently open transaction.
+ * @return -1 if there is no open TX
+ */
+ @Override
+ public Long getCurrentTxnId() {
+ if (currentTxnIndex >= 0) {
+ return txnToWriteIds.get(currentTxnIndex).getTxnId();
+ }
+ return -1L;
+ }
+
+ /**
+ * Get Id of currently open transaction.
+ * @return -1 if there is no open TX
+ */
+ @Override
+ public Long getCurrentWriteId() {
+ if (currentTxnIndex >= 0) {
+ return txnToWriteIds.get(currentTxnIndex).getWriteId();
+ }
+ return -1L;
+ }
+
+ /**
+ * get state of current transaction
+ * @return
+ */
+ @Override
+ public TxnState getCurrentTransactionState() {
+ return state;
+ }
+
+ /**
+ * Remaining transactions are the ones that are not committed or aborted or active.
+ * Active transaction is not considered part of remaining txns.
+ * @return number of transactions remaining this batch.
+ */
+ @Override
+ public int remainingTransactions() {
+ if (currentTxnIndex>=0) {
+ return txnToWriteIds.size() - currentTxnIndex -1;
+ }
+ return txnToWriteIds.size();
+ }
+
+
+ /**
+ * Write record using RecordWriter
+ * @param record the data to be written
+ * @throws StreamingIOFailure I/O failure
+ * @throws SerializationError serialization error
+ * @throws ImpersonationFailed error writing on behalf of proxyUser
+ * @throws InterruptedException
+ */
+ @Override
+ public void write(final byte[] record)
+ throws StreamingException, InterruptedException {
+ write(Collections.singletonList(record));
+ }
+ private void checkIsClosed() throws IllegalStateException {
+ if(isClosed) {
+ throw new IllegalStateException("TransactionBatch " + toString() + " has been closed()");
+ }
+ }
+ /**
+ * A transaction batch opens a single HDFS file and writes multiple transaction to it. If there is any issue
+ * with the write, we can't continue to write to the same file any as it may be corrupted now (at the tail).
+ * This ensures that a client can't ignore these failures and continue to write.
+ */
+ private void markDead(boolean success) {
+ if(success) {
+ return;
+ }
+ isClosed = true;//also ensures that heartbeat() is no-op since client is likely doing it async
+ try {
+ abort(true);//abort all remaining txns
+ }
+ catch(Exception ex) {
+ LOG.error("Fatal error on " + toString() + "; cause " + ex.getMessage(), ex);
+ }
+ try {
+ closeImpl();
+ }
+ catch (Exception ex) {
+ LOG.error("Fatal error on " + toString() + "; cause " + ex.getMessage(), ex);
+ }
+ }
+
+
+ /**
+ * Write records using RecordWriter
+ * @param records collection of rows to be written
+ * @throws StreamingException serialization error
+ * @throws ImpersonationFailed error writing on behalf of proxyUser
+ * @throws InterruptedException
+ */
+ @Override
+ public void write(final Collection<byte[]> records)
+ throws StreamingException, InterruptedException,
+ ImpersonationFailed {
+ checkIsClosed();
+ boolean success = false;
+ try {
+ if (ugi == null) {
+ writeImpl(records);
+ } else {
+ ugi.doAs(
+ new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws StreamingException {
+ writeImpl(records);
+ return null;
+ }
+ }
+ );
+ }
+ success = true;
+ } catch(SerializationError ex) {
+ //this exception indicates that a {@code record} could not be parsed and the
+ //caller can decide whether to drop it or send it to dead letter queue.
+ //rolling back the txn and retrying won't help since the tuple will be exactly the same
+ //when it's replayed.
+ success = true;
+ throw ex;
+ } catch(IOException e){
+ throw new ImpersonationFailed("Failed writing as user '" + username +
+ "' to endPoint :" + endPt + ". Transaction Id: "
+ + getCurrentTxnId(), e);
+ }
+ finally {
+ markDead(success);
+ }
+ }
+
+ private void writeImpl(Collection<byte[]> records)
+ throws StreamingException {
+ for (byte[] record : records) {
+ recordWriter.write(getCurrentWriteId(), record);
+ }
+ }
+
+
+ /**
+ * Commit the currently open transaction
+ * @throws TransactionError
+ * @throws StreamingIOFailure if flushing records failed
+ * @throws ImpersonationFailed if
+ * @throws InterruptedException
+ */
+ @Override
+ public void commit() throws TransactionError, StreamingException,
+ ImpersonationFailed, InterruptedException {
+ checkIsClosed();
+ boolean success = false;
+ try {
+ if (ugi == null) {
+ commitImpl();
+ }
+ else {
+ ugi.doAs(
+ new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws StreamingException {
+ commitImpl();
+ return null;
+ }
+ }
+ );
+ }
+ success = true;
+ } catch (IOException e) {
+ throw new ImpersonationFailed("Failed committing Txn ID " + getCurrentTxnId() + " as user '"
+ + username + "'on endPoint :" + endPt + ". Transaction Id: ", e);
+ }
+ finally {
+ markDead(success);
+ }
+ }
+
+ private void commitImpl() throws TransactionError, StreamingException {
+ try {
+ recordWriter.flush();
+ msClient.commitTxn(txnToWriteIds.get(currentTxnIndex).getTxnId());
+ state = TxnState.COMMITTED;
+ txnStatus[currentTxnIndex] = TxnState.COMMITTED;
+ } catch (NoSuchTxnException e) {
+ throw new TransactionError("Invalid transaction id : "
+ + getCurrentTxnId(), e);
+ } catch (TxnAbortedException e) {
+ throw new TransactionError("Aborted transaction cannot be committed"
+ , e);
+ } catch (TException e) {
+ throw new TransactionError("Unable to commit transaction"
+ + getCurrentTxnId(), e);
+ }
+ }
+
+ /**
+ * Abort the currently open transaction
+ * @throws TransactionError
+ */
+ @Override
+ public void abort() throws TransactionError, StreamingException
+ , ImpersonationFailed, InterruptedException {
+ if(isClosed) {
+ /**
+ * isDead is only set internally by this class. {@link #markDead(boolean)} will abort all
+ * remaining txns, so make this no-op to make sure that a well-behaved client that calls abort()
+ * error doesn't get misleading errors
+ */
+ return;
+ }
+ abort(false);
+ }
+ private void abort(final boolean abortAllRemaining) throws TransactionError, StreamingException
+ , ImpersonationFailed, InterruptedException {
+ if (ugi==null) {
+ abortImpl(abortAllRemaining);
+ return;
+ }
+ try {
+ ugi.doAs (
+ new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws StreamingException {
+ abortImpl(abortAllRemaining);
+ return null;
+ }
+ }
+ );
+ } catch (IOException e) {
+ throw new ImpersonationFailed("Failed aborting Txn " + getCurrentTxnId() + " as user '"
+ + username + "' on endPoint :" + endPt, e);
+ }
+ }
+
+ private void abortImpl(boolean abortAllRemaining) throws TransactionError, StreamingException {
+ try {
+ if(abortAllRemaining) {
+ //when last txn finished (abort/commit) the currentTxnIndex is pointing at that txn
+ //so we need to start from next one, if any. Also if batch was created but
+ //fetchTransactionBatch() was never called, we want to start with first txn
+ int minOpenTxnIndex = Math.max(currentTxnIndex +
+ (state == TxnState.ABORTED || state == TxnState.COMMITTED ? 1 : 0), 0);
+ for(currentTxnIndex = minOpenTxnIndex;
+ currentTxnIndex < txnToWriteIds.size(); currentTxnIndex++) {
+ msClient.rollbackTxn(txnToWriteIds.get(currentTxnIndex).getTxnId());
+ txnStatus[currentTxnIndex] = TxnState.ABORTED;
+ }
+ currentTxnIndex--;//since the loop left it == txnToWriteIds.size()
+ }
+ else {
+ if (getCurrentTxnId() > 0) {
+ msClient.rollbackTxn(getCurrentTxnId());
+ txnStatus[currentTxnIndex] = TxnState.ABORTED;
+ }
+ }
+ state = TxnState.ABORTED;
+ recordWriter.clear();
+ } catch (NoSuchTxnException e) {
+ throw new TransactionError("Unable to abort invalid transaction id : "
+ + getCurrentTxnId(), e);
+ } catch (TException e) {
+ throw new TransactionError("Unable to abort transaction id : "
+ + getCurrentTxnId(), e);
+ }
+ }
+
+ @Override
+ public void heartbeat() throws StreamingException, HeartBeatFailure {
+ if(isClosed) {
+ return;
+ }
+ if(state != TxnState.OPEN && currentTxnIndex >= txnToWriteIds.size() - 1) {
+ //here means last txn in the batch is resolved but the close() hasn't been called yet so
+ //there is nothing to heartbeat
+ return;
+ }
+ //if here after commit()/abort() but before next beginNextTransaction(), currentTxnIndex still
+ //points at the last txn which we don't want to heartbeat
+ Long first = txnToWriteIds.get(state == TxnState.OPEN ? currentTxnIndex : currentTxnIndex + 1).getTxnId();
+ Long last = txnToWriteIds.get(txnToWriteIds.size()-1).getTxnId();
+ try {
+ HeartbeatTxnRangeResponse resp = heartbeaterMSClient.heartbeatTxnRange(first, last);
+ if (!resp.getAborted().isEmpty() || !resp.getNosuch().isEmpty()) {
+ throw new HeartBeatFailure(resp.getAborted(), resp.getNosuch());
+ }
+ } catch (TException e) {
+ throw new StreamingException("Failure to heartbeat on ids (" + first + "src/gen/thrift"
+ + last + ") on end point : " + endPt );
+ }
+ }
+
+ @Override
+ public boolean isClosed() {
+ return isClosed;
+ }
+ /**
+ * Close the TransactionBatch. This will abort any still open txns in this batch.
+ * @throws StreamingIOFailure I/O failure when closing transaction batch
+ */
+ @Override
+ public void close() throws StreamingException, ImpersonationFailed, InterruptedException {
+ if(isClosed) {
+ return;
+ }
+ isClosed = true;
+ abortImpl(true);//abort proactively so that we don't wait for timeout
+ closeImpl();//perhaps we should add a version of RecordWriter.closeBatch(boolean abort) which
+ //will call RecordUpdater.close(boolean abort)
+ }
+ private void closeImpl() throws StreamingException, InterruptedException{
+ state = TxnState.INACTIVE;
+ if(ugi == null) {
+ recordWriter.closeBatch();
+ return;
+ }
+ try {
+ ugi.doAs (
+ new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws StreamingException {
+ recordWriter.closeBatch();
+ return null;
+ }
+ }
+ );
+ try {
+ FileSystem.closeAllForUGI(ugi);
+ } catch (IOException exception) {
+ LOG.error("Could not clean up file-system handles for UGI: " + ugi, exception);
+ }
+ } catch (IOException e) {
+ throw new ImpersonationFailed("Failed closing Txn Batch as user '" + username +
+ "' on endPoint :" + endPt, e);
+ }
+ }
+
+ private static LockRequest createLockRequest(final HiveEndPoint hiveEndPoint,
+ String partNameForLock, String user, long txnId, String agentInfo) {
+ LockRequestBuilder rqstBuilder = agentInfo == null ?
+ new LockRequestBuilder() : new LockRequestBuilder(agentInfo);
+ rqstBuilder.setUser(user);
+ rqstBuilder.setTransactionId(txnId);
+
+ LockComponentBuilder lockCompBuilder = new LockComponentBuilder()
+ .setDbName(hiveEndPoint.database)
+ .setTableName(hiveEndPoint.table)
+ .setShared()
+ .setOperationType(DataOperationType.INSERT);
+ if (partNameForLock!=null && !partNameForLock.isEmpty() ) {
+ lockCompBuilder.setPartitionName(partNameForLock);
+ }
+ rqstBuilder.addLockComponent(lockCompBuilder.build());
+
+ return rqstBuilder.build();
+ }
+ } // class TransactionBatchImpl
+
+ static HiveConf createHiveConf(Class<?> clazz, String metaStoreUri) {
+ HiveConf conf = new HiveConf(clazz);
+ if (metaStoreUri!= null) {
+ setHiveConf(conf, HiveConf.ConfVars.METASTOREURIS, metaStoreUri);
+ }
+ HiveEndPoint.overrideConfSettings(conf);
+ return conf;
+ }
+
+ private static void overrideConfSettings(HiveConf conf) {
+ setHiveConf(conf, HiveConf.ConfVars.HIVE_TXN_MANAGER,
+ "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager");
+ setHiveConf(conf, HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, true);
+ setHiveConf(conf, HiveConf.ConfVars.METASTORE_EXECUTE_SET_UGI, true);
+ // Avoids creating Tez Client sessions internally as it takes much longer currently
+ setHiveConf(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "mr");
+ }
+
+ private static void setHiveConf(HiveConf conf, HiveConf.ConfVars var, String value) {
+ if( LOG.isDebugEnabled() ) {
+ LOG.debug("Overriding HiveConf setting : " + var + " = " + value);
+ }
+ conf.setVar(var, value);
+ }
+
+ private static void setHiveConf(HiveConf conf, HiveConf.ConfVars var, boolean value) {
+ if( LOG.isDebugEnabled() ) {
+ LOG.debug("Overriding HiveConf setting : " + var + " = " + value);
+ }
+ conf.setBoolVar(var, value);
+ }
+
+} // class HiveEndPoint
http://git-wip-us.apache.org/repos/asf/hive/blob/6bd32a0d/streaming/src/java/org/apache/hive/streaming/ImpersonationFailed.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/ImpersonationFailed.java b/streaming/src/java/org/apache/hive/streaming/ImpersonationFailed.java
new file mode 100644
index 0000000..23e17e7
--- /dev/null
+++ b/streaming/src/java/org/apache/hive/streaming/ImpersonationFailed.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+public class ImpersonationFailed extends StreamingException {
+ public ImpersonationFailed(String username, Exception e) {
+ super("Failed to impersonate user " + username, e);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/6bd32a0d/streaming/src/java/org/apache/hive/streaming/InvalidColumn.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/InvalidColumn.java b/streaming/src/java/org/apache/hive/streaming/InvalidColumn.java
new file mode 100644
index 0000000..0011b14
--- /dev/null
+++ b/streaming/src/java/org/apache/hive/streaming/InvalidColumn.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+public class InvalidColumn extends StreamingException {
+
+ public InvalidColumn(String msg) {
+ super(msg);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/6bd32a0d/streaming/src/java/org/apache/hive/streaming/InvalidPartition.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/InvalidPartition.java b/streaming/src/java/org/apache/hive/streaming/InvalidPartition.java
new file mode 100644
index 0000000..f1f9804
--- /dev/null
+++ b/streaming/src/java/org/apache/hive/streaming/InvalidPartition.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+public class InvalidPartition extends StreamingException {
+
+ public InvalidPartition(String partitionName, String partitionValue) {
+ super("Invalid partition: Name=" + partitionName +
+ ", Value=" + partitionValue);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/6bd32a0d/streaming/src/java/org/apache/hive/streaming/InvalidTable.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/InvalidTable.java b/streaming/src/java/org/apache/hive/streaming/InvalidTable.java
new file mode 100644
index 0000000..ef1c91d
--- /dev/null
+++ b/streaming/src/java/org/apache/hive/streaming/InvalidTable.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+public class InvalidTable extends StreamingException {
+
+ private static String makeMsg(String db, String table) {
+ return "Invalid table db:" + db + ", table:" + table;
+ }
+
+ public InvalidTable(String db, String table) {
+ super(makeMsg(db,table), null);
+ }
+
+ public InvalidTable(String db, String table, String msg) {
+ super(makeMsg(db, table) + ": " + msg, null);
+ }
+
+ public InvalidTable(String db, String table, Exception inner) {
+ super(makeMsg(db, table) + ": " + inner.getMessage(), inner);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/6bd32a0d/streaming/src/java/org/apache/hive/streaming/InvalidTrasactionState.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/InvalidTrasactionState.java b/streaming/src/java/org/apache/hive/streaming/InvalidTrasactionState.java
new file mode 100644
index 0000000..762f5f8
--- /dev/null
+++ b/streaming/src/java/org/apache/hive/streaming/InvalidTrasactionState.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+public class InvalidTrasactionState extends TransactionError {
+ public InvalidTrasactionState(String msg) {
+ super(msg);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/6bd32a0d/streaming/src/java/org/apache/hive/streaming/PartitionCreationFailed.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/PartitionCreationFailed.java b/streaming/src/java/org/apache/hive/streaming/PartitionCreationFailed.java
new file mode 100644
index 0000000..5f9aca6
--- /dev/null
+++ b/streaming/src/java/org/apache/hive/streaming/PartitionCreationFailed.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+public class PartitionCreationFailed extends StreamingException {
+ public PartitionCreationFailed(HiveEndPoint endPoint, Exception cause) {
+ super("Failed to create partition " + endPoint, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/6bd32a0d/streaming/src/java/org/apache/hive/streaming/QueryFailedException.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/QueryFailedException.java b/streaming/src/java/org/apache/hive/streaming/QueryFailedException.java
new file mode 100644
index 0000000..ccd3ae0
--- /dev/null
+++ b/streaming/src/java/org/apache/hive/streaming/QueryFailedException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+public class QueryFailedException extends StreamingException {
+ String query;
+
+ public QueryFailedException(String query, Exception e) {
+ super("Query failed: " + query + ". Due to :" + e.getMessage(), e);
+ this.query = query;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/6bd32a0d/streaming/src/java/org/apache/hive/streaming/RecordWriter.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/RecordWriter.java b/streaming/src/java/org/apache/hive/streaming/RecordWriter.java
new file mode 100644
index 0000000..dc6d70e
--- /dev/null
+++ b/streaming/src/java/org/apache/hive/streaming/RecordWriter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+
+public interface RecordWriter {
+
+ /** Writes using a hive RecordUpdater
+ *
+ * @param writeId the write ID of the table mapping to Txn in which the write occurs
+ * @param record the record to be written
+ */
+ void write(long writeId, byte[] record) throws StreamingException;
+
+ /** Flush records from buffer. Invoked by TransactionBatch.commit() */
+ void flush() throws StreamingException;
+
+ /** Clear bufferred writes. Invoked by TransactionBatch.abort() */
+ void clear() throws StreamingException;
+
+ /** Acquire a new RecordUpdater. Invoked when
+ * StreamingConnection.fetchTransactionBatch() is called */
+ void newBatch(Long minWriteId, Long maxWriteID) throws StreamingException;
+
+ /** Close the RecordUpdater. Invoked by TransactionBatch.close() */
+ void closeBatch() throws StreamingException;
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/6bd32a0d/streaming/src/java/org/apache/hive/streaming/SerializationError.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/SerializationError.java b/streaming/src/java/org/apache/hive/streaming/SerializationError.java
new file mode 100644
index 0000000..a57ba00
--- /dev/null
+++ b/streaming/src/java/org/apache/hive/streaming/SerializationError.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+
+public class SerializationError extends StreamingException {
+ public SerializationError(String msg, Exception e) {
+ super(msg,e);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/6bd32a0d/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java b/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java
new file mode 100644
index 0000000..2f760ea
--- /dev/null
+++ b/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * Represents a connection to a HiveEndPoint. Used to acquire transaction batches.
+ * Note: the expectation is that there is at most 1 TransactionBatch outstanding for any given
+ * StreamingConnection. Violating this may result in "out of sequence response".
+ */
+public interface StreamingConnection {
+
+ /**
+ * Acquires a new batch of transactions from Hive.
+
+ * @param numTransactionsHint is a hint from client indicating how many transactions client needs.
+ * @param writer Used to write record. The same writer instance can
+ * be shared with another TransactionBatch (to the same endpoint)
+ * only after the first TransactionBatch has been closed.
+ * Writer will be closed when the TransactionBatch is closed.
+ * @return
+ * @throws ConnectionError
+ * @throws InvalidPartition
+ * @throws StreamingException
+ * @return a batch of transactions
+ */
+ public TransactionBatch fetchTransactionBatch(int numTransactionsHint,
+ RecordWriter writer)
+ throws ConnectionError, StreamingException, InterruptedException;
+
+ /**
+ * Close connection
+ */
+ public void close();
+
+ /**
+ * @return UserGroupInformation associated with this connection or {@code null} if there is none
+ */
+ UserGroupInformation getUserGroupInformation();
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/6bd32a0d/streaming/src/java/org/apache/hive/streaming/StreamingException.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/StreamingException.java b/streaming/src/java/org/apache/hive/streaming/StreamingException.java
new file mode 100644
index 0000000..a7f84c1
--- /dev/null
+++ b/streaming/src/java/org/apache/hive/streaming/StreamingException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+public class StreamingException extends Exception {
+ public StreamingException(String msg, Exception cause) {
+ super(msg, cause);
+ }
+ public StreamingException(String msg) {
+ super(msg);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/6bd32a0d/streaming/src/java/org/apache/hive/streaming/StreamingIOFailure.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/StreamingIOFailure.java b/streaming/src/java/org/apache/hive/streaming/StreamingIOFailure.java
new file mode 100644
index 0000000..0dfbfa7
--- /dev/null
+++ b/streaming/src/java/org/apache/hive/streaming/StreamingIOFailure.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+
+public class StreamingIOFailure extends StreamingException {
+
+ public StreamingIOFailure(String msg, Exception cause) {
+ super(msg, cause);
+ }
+
+ public StreamingIOFailure(String msg) {
+ super(msg);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/6bd32a0d/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java b/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java
new file mode 100644
index 0000000..0077913
--- /dev/null
+++ b/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.io.Text;
+import org.apache.hive.hcatalog.data.HCatRecordObjectInspector;
+import org.apache.hive.hcatalog.data.JsonSerDe;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Streaming Writer handles utf8 encoded Json (Strict syntax).
+ * Uses org.apache.hive.hcatalog.data.JsonSerDe to process Json input
+ */
+public class StrictJsonWriter extends AbstractRecordWriter {
+ private JsonSerDe serde;
+
+ private final HCatRecordObjectInspector recordObjInspector;
+ private final ObjectInspector[] bucketObjInspectors;
+ private final StructField[] bucketStructFields;
+
+ /**
+ * @deprecated As of release 1.3/2.1. Replaced by {@link #StrictJsonWriter(HiveEndPoint, HiveConf, StreamingConnection)}
+ */
+ public StrictJsonWriter(HiveEndPoint endPoint)
+ throws ConnectionError, SerializationError, StreamingException {
+ this(endPoint, null, null);
+ }
+
+ /**
+ * @deprecated As of release 1.3/2.1. Replaced by {@link #StrictJsonWriter(HiveEndPoint, HiveConf, StreamingConnection)}
+ */
+ public StrictJsonWriter(HiveEndPoint endPoint, HiveConf conf) throws StreamingException {
+ this(endPoint, conf, null);
+ }
+ /**
+ * @param endPoint the end point to write to
+ * @throws ConnectionError
+ * @throws SerializationError
+ * @throws StreamingException
+ */
+ public StrictJsonWriter(HiveEndPoint endPoint, StreamingConnection conn)
+ throws ConnectionError, SerializationError, StreamingException {
+ this(endPoint, null, conn);
+ }
+ /**
+ * @param endPoint the end point to write to
+ * @param conf a Hive conf object. Should be null if not using advanced Hive settings.
+ * @param conn connection this Writer is to be used with
+ * @throws ConnectionError
+ * @throws SerializationError
+ * @throws StreamingException
+ */
+ public StrictJsonWriter(HiveEndPoint endPoint, HiveConf conf, StreamingConnection conn)
+ throws ConnectionError, SerializationError, StreamingException {
+ super(endPoint, conf, conn);
+ this.serde = createSerde(tbl, conf);
+ // get ObjInspectors for entire record and bucketed cols
+ try {
+ recordObjInspector = ( HCatRecordObjectInspector ) serde.getObjectInspector();
+ this.bucketObjInspectors = getObjectInspectorsForBucketedCols(bucketIds, recordObjInspector);
+ } catch (SerDeException e) {
+ throw new SerializationError("Unable to get ObjectInspector for bucket columns", e);
+ }
+
+ // get StructFields for bucketed cols
+ bucketStructFields = new StructField[bucketIds.size()];
+ List<? extends StructField> allFields = recordObjInspector.getAllStructFieldRefs();
+ for (int i = 0; i < bucketIds.size(); i++) {
+ bucketStructFields[i] = allFields.get(bucketIds.get(i));
+ }
+ }
+
+ @Override
+ public AbstractSerDe getSerde() {
+ return serde;
+ }
+
+ protected HCatRecordObjectInspector getRecordObjectInspector() {
+ return recordObjInspector;
+ }
+
+ @Override
+ protected StructField[] getBucketStructFields() {
+ return bucketStructFields;
+ }
+
+ protected ObjectInspector[] getBucketObjectInspectors() {
+ return bucketObjInspectors;
+ }
+
+
+ @Override
+ public void write(long writeId, byte[] record)
+ throws StreamingIOFailure, SerializationError {
+ try {
+ Object encodedRow = encode(record);
+ int bucket = getBucket(encodedRow);
+ getRecordUpdater(bucket).insert(writeId, encodedRow);
+ } catch (IOException e) {
+ throw new StreamingIOFailure("Error writing record in transaction write id("
+ + writeId + ")", e);
+ }
+
+ }
+
+ /**
+ * Creates JsonSerDe
+ * @param tbl used to create serde
+ * @param conf used to create serde
+ * @return
+ * @throws SerializationError if serde could not be initialized
+ */
+ private static JsonSerDe createSerde(Table tbl, HiveConf conf)
+ throws SerializationError {
+ try {
+ Properties tableProps = MetaStoreUtils.getTableMetadata(tbl);
+ JsonSerDe serde = new JsonSerDe();
+ SerDeUtils.initializeSerDe(serde, conf, tableProps, null);
+ return serde;
+ } catch (SerDeException e) {
+ throw new SerializationError("Error initializing serde " + JsonSerDe.class.getName(), e);
+ }
+ }
+
+ @Override
+ public Object encode(byte[] utf8StrRecord) throws SerializationError {
+ try {
+ Text blob = new Text(utf8StrRecord);
+ return serde.deserialize(blob);
+ } catch (SerDeException e) {
+ throw new SerializationError("Unable to convert byte[] record into Object", e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/6bd32a0d/streaming/src/java/org/apache/hive/streaming/StrictRegexWriter.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/StrictRegexWriter.java b/streaming/src/java/org/apache/hive/streaming/StrictRegexWriter.java
new file mode 100644
index 0000000..c0b7324
--- /dev/null
+++ b/streaming/src/java/org/apache/hive/streaming/StrictRegexWriter.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.RegexSerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.Text;
+
+/**
+ * Streaming Writer handles text input data with regex. Uses
+ * org.apache.hadoop.hive.serde2.RegexSerDe
+ */
+public class StrictRegexWriter extends AbstractRecordWriter {
+ private RegexSerDe serde;
+ private final StructObjectInspector recordObjInspector;
+ private final ObjectInspector[] bucketObjInspectors;
+ private final StructField[] bucketStructFields;
+
+ /**
+ * @param endPoint the end point to write to
+ * @param conn connection this Writer is to be used with
+ * @throws ConnectionError
+ * @throws SerializationError
+ * @throws StreamingException
+ */
+ public StrictRegexWriter(HiveEndPoint endPoint, StreamingConnection conn)
+ throws ConnectionError, SerializationError, StreamingException {
+ this(null, endPoint, null, conn);
+ }
+
+ /**
+ * @param endPoint the end point to write to
+ * @param conf a Hive conf object. Should be null if not using advanced Hive settings.
+ * @param conn connection this Writer is to be used with
+ * @throws ConnectionError
+ * @throws SerializationError
+ * @throws StreamingException
+ */
+ public StrictRegexWriter(HiveEndPoint endPoint, HiveConf conf, StreamingConnection conn)
+ throws ConnectionError, SerializationError, StreamingException {
+ this(null, endPoint, conf, conn);
+ }
+
+ /**
+ * @param regex to parse the data
+ * @param endPoint the end point to write to
+ * @param conf a Hive conf object. Should be null if not using advanced Hive settings.
+ * @param conn connection this Writer is to be used with
+ * @throws ConnectionError
+ * @throws SerializationError
+ * @throws StreamingException
+ */
+ public StrictRegexWriter(String regex, HiveEndPoint endPoint, HiveConf conf, StreamingConnection conn)
+ throws ConnectionError, SerializationError, StreamingException {
+ super(endPoint, conf, conn);
+ this.serde = createSerde(tbl, conf, regex);
+ // get ObjInspectors for entire record and bucketed cols
+ try {
+ recordObjInspector = (StructObjectInspector) serde.getObjectInspector();
+ this.bucketObjInspectors = getObjectInspectorsForBucketedCols(bucketIds, recordObjInspector);
+ } catch (SerDeException e) {
+ throw new SerializationError("Unable to get ObjectInspector for bucket columns", e);
+ }
+
+ // get StructFields for bucketed cols
+ bucketStructFields = new StructField[bucketIds.size()];
+ List<? extends StructField> allFields = recordObjInspector.getAllStructFieldRefs();
+ for (int i = 0; i < bucketIds.size(); i++) {
+ bucketStructFields[i] = allFields.get(bucketIds.get(i));
+ }
+ }
+
+ @Override
+ public AbstractSerDe getSerde() {
+ return serde;
+ }
+
+ @Override
+ protected StructObjectInspector getRecordObjectInspector() {
+ return recordObjInspector;
+ }
+
+ @Override
+ protected StructField[] getBucketStructFields() {
+ return bucketStructFields;
+ }
+
+ @Override
+ protected ObjectInspector[] getBucketObjectInspectors() {
+ return bucketObjInspectors;
+ }
+
+
+ @Override
+ public void write(long writeId, byte[] record)
+ throws StreamingIOFailure, SerializationError {
+ try {
+ Object encodedRow = encode(record);
+ int bucket = getBucket(encodedRow);
+ getRecordUpdater(bucket).insert(writeId, encodedRow);
+ } catch (IOException e) {
+ throw new StreamingIOFailure("Error writing record in transaction write id("
+ + writeId + ")", e);
+ }
+ }
+
+ /**
+ * Creates RegexSerDe
+ *
+ * @param tbl used to create serde
+ * @param conf used to create serde
+ * @param regex used to create serde
+ * @return
+ * @throws SerializationError if serde could not be initialized
+ */
+ private static RegexSerDe createSerde(Table tbl, HiveConf conf, String regex)
+ throws SerializationError {
+ try {
+ Properties tableProps = MetaStoreUtils.getTableMetadata(tbl);
+ tableProps.setProperty(RegexSerDe.INPUT_REGEX, regex);
+ ArrayList<String> tableColumns = getCols(tbl);
+ tableProps.setProperty(serdeConstants.LIST_COLUMNS, StringUtils.join(tableColumns, ","));
+ RegexSerDe serde = new RegexSerDe();
+ SerDeUtils.initializeSerDe(serde, conf, tableProps, null);
+ return serde;
+ } catch (SerDeException e) {
+ throw new SerializationError("Error initializing serde " + RegexSerDe.class.getName(), e);
+ }
+ }
+
+ private static ArrayList<String> getCols(Table table) {
+ List<FieldSchema> cols = table.getSd().getCols();
+ ArrayList<String> colNames = new ArrayList<String>(cols.size());
+ for (FieldSchema col : cols) {
+ colNames.add(col.getName().toLowerCase());
+ }
+ return colNames;
+ }
+
+ /**
+ * Encode Utf8 encoded string bytes using RegexSerDe
+ *
+ * @param utf8StrRecord
+ * @return The encoded object
+ * @throws SerializationError
+ */
+ @Override
+ public Object encode(byte[] utf8StrRecord) throws SerializationError {
+ try {
+ Text blob = new Text(utf8StrRecord);
+ return serde.deserialize(blob);
+ } catch (SerDeException e) {
+ throw new SerializationError("Unable to convert byte[] record into Object", e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/6bd32a0d/streaming/src/java/org/apache/hive/streaming/TransactionBatch.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/TransactionBatch.java b/streaming/src/java/org/apache/hive/streaming/TransactionBatch.java
new file mode 100644
index 0000000..2b05771
--- /dev/null
+++ b/streaming/src/java/org/apache/hive/streaming/TransactionBatch.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+
+import java.util.Collection;
+
+/**
+ * Represents a set of Transactions returned by Hive. Supports opening, writing to
+ * and commiting/aborting each transaction. The interface is designed to ensure
+ * transactions in a batch are used up sequentially. To stream to the same HiveEndPoint
+ * concurrently, create separate StreamingConnections.
+ *
+ * Note on thread safety: At most 2 threads can run through a given TransactionBatch at the same
+ * time. One thread may call {@link #heartbeat()} and the other all other methods.
+ * Violating this may result in "out of sequence response".
+ *
+ */
+public interface TransactionBatch {
+ enum TxnState {
+ INACTIVE("I"), OPEN("O"), COMMITTED("C"), ABORTED("A");
+
+ private final String code;
+ TxnState(String code) {
+ this.code = code;
+ };
+ public String toString() {
+ return code;
+ }
+ }
+
+ /**
+ * Activate the next available transaction in the current transaction batch.
+ * @throws StreamingException if not able to switch to next Txn
+ * @throws InterruptedException if call in interrupted
+ */
+ void beginNextTransaction() throws StreamingException, InterruptedException;
+
+ /**
+ * Get Id of currently open transaction.
+ * @return transaction id
+ */
+ Long getCurrentTxnId();
+
+
+ /**
+ * Get write Id mapping to currently open transaction.
+ * @return write id
+ */
+ Long getCurrentWriteId();
+
+ /**
+ * get state of current transaction.
+ */
+ TxnState getCurrentTransactionState();
+
+ /**
+ * Commit the currently open transaction.
+ * @throws StreamingException if there are errors committing
+ * @throws InterruptedException if call in interrupted
+ */
+ void commit() throws StreamingException, InterruptedException;
+
+ /**
+ * Abort the currently open transaction.
+ * @throws StreamingException if there are errors
+ * @throws InterruptedException if call in interrupted
+ */
+ void abort() throws StreamingException, InterruptedException;
+
+ /**
+ * Remaining transactions are the ones that are not committed or aborted or open.
+ * Current open transaction is not considered part of remaining txns.
+ * @return number of transactions remaining this batch.
+ */
+ int remainingTransactions();
+
+
+ /**
+ * Write record using RecordWriter.
+ * @param record the data to be written
+ * @throws StreamingException if there are errors when writing
+ * @throws InterruptedException if call in interrupted
+ */
+ void write(byte[] record) throws StreamingException, InterruptedException;
+
+ /**
+ * Write records using RecordWriter.
+ * @throws StreamingException if there are errors when writing
+ * @throws InterruptedException if call in interrupted
+ */
+ void write(Collection<byte[]> records) throws StreamingException, InterruptedException;
+
+
+ /**
+ * Issues a heartbeat to hive metastore on the current and remaining txn ids
+ * to keep them from expiring.
+ * @throws StreamingException if there are errors
+ */
+ void heartbeat() throws StreamingException;
+
+ /**
+ * Close the TransactionBatch.
+ * @throws StreamingException if there are errors closing batch
+ * @throws InterruptedException if call in interrupted
+ */
+ void close() throws StreamingException, InterruptedException;
+ boolean isClosed();
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/6bd32a0d/streaming/src/java/org/apache/hive/streaming/TransactionBatchUnAvailable.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/TransactionBatchUnAvailable.java b/streaming/src/java/org/apache/hive/streaming/TransactionBatchUnAvailable.java
new file mode 100644
index 0000000..a8c8cd4
--- /dev/null
+++ b/streaming/src/java/org/apache/hive/streaming/TransactionBatchUnAvailable.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+public class TransactionBatchUnAvailable extends StreamingException {
+ public TransactionBatchUnAvailable(HiveEndPoint ep, Exception e) {
+ super("Unable to acquire transaction batch on end point: " + ep, e);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/6bd32a0d/streaming/src/java/org/apache/hive/streaming/TransactionError.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/TransactionError.java b/streaming/src/java/org/apache/hive/streaming/TransactionError.java
new file mode 100644
index 0000000..a331b20
--- /dev/null
+++ b/streaming/src/java/org/apache/hive/streaming/TransactionError.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+public class TransactionError extends StreamingException {
+ public TransactionError(String msg, Exception e) {
+ super(msg + (e == null ? "" : ": " + e.getMessage()), e);
+ }
+
+ public TransactionError(String msg) {
+ super(msg);
+ }
+}