You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2016/07/09 02:04:10 UTC
[1/2] hive git commit: HIVE-14192 False positive error due to thrift
(Eugene Koifman, reviewed by Wei Zheng)
Repository: hive
Updated Branches:
refs/heads/branch-2.1 6588b8402 -> cec61d945
HIVE-14192 False positive error due to thrift (Eugene Koifman, reviewed by Wei Zheng)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/c5a9b0cb
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/c5a9b0cb
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/c5a9b0cb
Branch: refs/heads/branch-2.1
Commit: c5a9b0cbc4dffc306d6f7189b63b4009d0fbeb35
Parents: 6588b84
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Fri Jul 8 18:32:44 2016 -0700
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Fri Jul 8 18:32:44 2016 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/metastore/txn/TxnHandler.java | 9 +++++++--
1 file changed, 7 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/c5a9b0cb/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 119b08e..e4025ca 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -927,8 +927,13 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
List<String> rows = new ArrayList<>();
long intLockId = 0;
for (LockComponent lc : rqst.getComponent()) {
- if(lc.isSetOperationType() && lc.getOperationType() == DataOperationType.UNSET) {
- //old version of thrift client should have (lc.isSetOperationType() == false)
+ if(lc.isSetOperationType() && lc.getOperationType() == DataOperationType.UNSET &&
+ (conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST) || conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEZ_TEST))) {
+ //old version of thrift client should have (lc.isSetOperationType() == false) but they do not
+ //If you add a default value to a variable, isSet() for that variable is true regardless of the where the
+ //message was created (for object variables. It works correctly for boolean vars, e.g. LockComponent.isAcid).
+ //in test mode, upgrades are not tested, so client version and server version of thrift always matches so
+ //we see UNSET here it means something didn't set the appropriate value.
throw new IllegalStateException("Bug: operationType=" + lc.getOperationType() + " for component "
+ lc + " agentInfo=" + rqst.getAgentInfo());
}
[2/2] hive git commit: HIVE-14114 Ensure RecordWriter in streaming
API is using the same UserGroupInformation as StreamingConnection (Eugene
Koifman, reviewed by Wei Zheng)
Posted by ek...@apache.org.
HIVE-14114 Ensure RecordWriter in streaming API is using the same UserGroupInformation as StreamingConnection (Eugene Koifman, reviewed by Wei Zheng)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/cec61d94
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/cec61d94
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/cec61d94
Branch: refs/heads/branch-2.1
Commit: cec61d945e7949f3585e88d777f0687664fcb85e
Parents: c5a9b0c
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Fri Jul 8 18:34:08 2016 -0700
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Fri Jul 8 18:34:08 2016 -0700
----------------------------------------------------------------------
.../streaming/AbstractRecordWriter.java | 61 +++++++++++++------
.../streaming/DelimitedInputWriter.java | 47 +++++++++++----
.../hive/hcatalog/streaming/HiveEndPoint.java | 10 +++-
.../hcatalog/streaming/StreamingConnection.java | 6 ++
.../hcatalog/streaming/StrictJsonWriter.java | 26 +++++---
.../hive/hcatalog/streaming/TestStreaming.java | 63 +++++++++++---------
6 files changed, 149 insertions(+), 64 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/cec61d94/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
index 0c6b9ea..974c6b8 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
@@ -19,6 +19,7 @@
package org.apache.hive.hcatalog.streaming;
+import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.Path;
@@ -43,6 +44,7 @@ import org.apache.thrift.TException;
import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
@@ -68,34 +70,59 @@ public abstract class AbstractRecordWriter implements RecordWriter {
private Long curBatchMinTxnId;
private Long curBatchMaxTxnId;
+ private static final class TableWriterPair {
+ private final Table tbl;
+ private final Path partitionPath;
+ TableWriterPair(Table t, Path p) {
+ tbl = t;
+ partitionPath = p;
+ }
+ }
+ /**
+ * @deprecated As of release 1.3/2.1. Replaced by {@link #AbstractRecordWriter(HiveEndPoint, HiveConf, StreamingConnection)}
+ */
protected AbstractRecordWriter(HiveEndPoint endPoint, HiveConf conf)
- throws ConnectionError, StreamingException {
- this.endPoint = endPoint;
+ throws ConnectionError, StreamingException {
+ this(endPoint, conf, null);
+ }
+ protected AbstractRecordWriter(HiveEndPoint endPoint2, HiveConf conf, StreamingConnection conn)
+ throws StreamingException {
+ this.endPoint = endPoint2;
this.conf = conf!=null ? conf
: HiveEndPoint.createHiveConf(DelimitedInputWriter.class, endPoint.metaStoreUri);
try {
msClient = HCatUtil.getHiveMetastoreClient(this.conf);
- this.tbl = msClient.getTable(endPoint.database, endPoint.table);
- this.partitionPath = getPathForEndPoint(msClient, endPoint);
+ UserGroupInformation ugi = conn != null ? conn.getUserGroupInformation() : null;
+ if (ugi == null) {
+ this.tbl = msClient.getTable(endPoint.database, endPoint.table);
+ this.partitionPath = getPathForEndPoint(msClient, endPoint);
+ } else {
+ TableWriterPair twp = ugi.doAs(
+ new PrivilegedExceptionAction<TableWriterPair>() {
+ @Override
+ public TableWriterPair run() throws Exception {
+ return new TableWriterPair(msClient.getTable(endPoint.database, endPoint.table),
+ getPathForEndPoint(msClient, endPoint));
+ }
+ });
+ this.tbl = twp.tbl;
+ this.partitionPath = twp.partitionPath;
+ }
this.totalBuckets = tbl.getSd().getNumBuckets();
- if(totalBuckets <= 0) {
+ if (totalBuckets <= 0) {
throw new StreamingException("Cannot stream to table that has not been bucketed : "
- + endPoint);
+ + endPoint);
}
- this.bucketIds = getBucketColIDs(tbl.getSd().getBucketCols(), tbl.getSd().getCols()) ;
+ this.bucketIds = getBucketColIDs(tbl.getSd().getBucketCols(), tbl.getSd().getCols());
this.bucketFieldData = new Object[bucketIds.size()];
String outFormatName = this.tbl.getSd().getOutputFormat();
- outf = (AcidOutputFormat<?,?>) ReflectionUtils.newInstance(JavaUtils.loadClass(outFormatName), conf);
+ outf = (AcidOutputFormat<?, ?>) ReflectionUtils.newInstance(JavaUtils.loadClass(outFormatName), conf);
bucketFieldData = new Object[bucketIds.size()];
- } catch (MetaException e) {
- throw new ConnectionError(endPoint, e);
- } catch (NoSuchObjectException e) {
- throw new ConnectionError(endPoint, e);
- } catch (TException e) {
- throw new StreamingException(e.getMessage(), e);
- } catch (ClassNotFoundException e) {
- throw new StreamingException(e.getMessage(), e);
- } catch (IOException e) {
+ } catch(InterruptedException e) {
+ throw new StreamingException(endPoint2.toString(), e);
+ } catch (MetaException | NoSuchObjectException e) {
+ throw new ConnectionError(endPoint2, e);
+ } catch (TException | ClassNotFoundException | IOException e) {
throw new StreamingException(e.getMessage(), e);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/cec61d94/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java
index 394cc54..7ab2fc6 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java
@@ -20,6 +20,7 @@ package org.apache.hive.hcatalog.streaming;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -73,12 +74,11 @@ public class DelimitedInputWriter extends AbstractRecordWriter {
* @throws InvalidColumn any element in colNamesForFields refers to a non existing column
*/
public DelimitedInputWriter(String[] colNamesForFields, String delimiter,
- HiveEndPoint endPoint)
- throws ClassNotFoundException, ConnectionError, SerializationError,
- InvalidColumn, StreamingException {
- this(colNamesForFields, delimiter, endPoint, null);
+ HiveEndPoint endPoint, StreamingConnection conn)
+ throws ClassNotFoundException, ConnectionError, SerializationError,
+ InvalidColumn, StreamingException {
+ this(colNamesForFields, delimiter, endPoint, null, conn);
}
-
/** Constructor. Uses default separator of the LazySimpleSerde
* @param colNamesForFields Column name assignment for input fields. nulls or empty
* strings in the array indicates the fields to be skipped
@@ -92,13 +92,12 @@ public class DelimitedInputWriter extends AbstractRecordWriter {
* @throws InvalidColumn any element in colNamesForFields refers to a non existing column
*/
public DelimitedInputWriter(String[] colNamesForFields, String delimiter,
- HiveEndPoint endPoint, HiveConf conf)
+ HiveEndPoint endPoint, HiveConf conf, StreamingConnection conn)
throws ClassNotFoundException, ConnectionError, SerializationError,
InvalidColumn, StreamingException {
this(colNamesForFields, delimiter, endPoint, conf,
- (char) LazySerDeParameters.DefaultSeparators[0]);
+ (char) LazySerDeParameters.DefaultSeparators[0], conn);
}
-
/**
* Constructor. Allows overriding separator of the LazySimpleSerde
* @param colNamesForFields Column name assignment for input fields
@@ -108,6 +107,7 @@ public class DelimitedInputWriter extends AbstractRecordWriter {
* @param serdeSeparator separator used when encoding data that is fed into the
* LazySimpleSerde. Ensure this separator does not occur
* in the field data
+ * @param conn connection this Writer is to be used with
* @throws ConnectionError Problem talking to Hive
* @throws ClassNotFoundException Serde class not found
* @throws SerializationError Serde initialization/interaction failed
@@ -115,10 +115,10 @@ public class DelimitedInputWriter extends AbstractRecordWriter {
* @throws InvalidColumn any element in colNamesForFields refers to a non existing column
*/
public DelimitedInputWriter(String[] colNamesForFields, String delimiter,
- HiveEndPoint endPoint, HiveConf conf, char serdeSeparator)
+ HiveEndPoint endPoint, HiveConf conf, char serdeSeparator, StreamingConnection conn)
throws ClassNotFoundException, ConnectionError, SerializationError,
InvalidColumn, StreamingException {
- super(endPoint, conf);
+ super(endPoint, conf, conn);
this.tableColumns = getCols(tbl);
this.serdeSeparator = serdeSeparator;
this.delimiter = delimiter;
@@ -143,6 +143,33 @@ public class DelimitedInputWriter extends AbstractRecordWriter {
bucketStructFields[i] = allFields.get(bucketIds.get(i));
}
}
+ /**
+ * @deprecated As of release 1.3/2.1. Replaced by {@link #DelimitedInputWriter(String[], String, HiveEndPoint, StreamingConnection)}
+ */
+ public DelimitedInputWriter(String[] colNamesForFields, String delimiter,
+ HiveEndPoint endPoint)
+ throws ClassNotFoundException, ConnectionError, SerializationError,
+ InvalidColumn, StreamingException {
+ this(colNamesForFields, delimiter, endPoint, null, null);
+ }
+ /**
+ * @deprecated As of release 1.3/2.1. Replaced by {@link #DelimitedInputWriter(String[], String, HiveEndPoint, HiveConf, StreamingConnection)}
+ */
+ public DelimitedInputWriter(String[] colNamesForFields, String delimiter,
+ HiveEndPoint endPoint, HiveConf conf)
+ throws ClassNotFoundException, ConnectionError, SerializationError,
+ InvalidColumn, StreamingException {
+ this(colNamesForFields, delimiter, endPoint, conf,
+ (char) LazySerDeParameters.DefaultSeparators[0], null);
+ }
+ /**
+ * @deprecated As of release 1.3/2.1. Replaced by {@link #DelimitedInputWriter(String[], String, HiveEndPoint, HiveConf, char, StreamingConnection)}
+ */
+ public DelimitedInputWriter(String[] colNamesForFields, String delimiter,
+ HiveEndPoint endPoint, HiveConf conf, char serdeSeparator)
+ throws ClassNotFoundException, StreamingException {
+ this(colNamesForFields, delimiter, endPoint, conf, serdeSeparator, null);
+ }
private boolean isReorderingNeeded(String delimiter, ArrayList<String> tableColumns) {
return !( delimiter.equals(String.valueOf(getSerdeSeparator()))
http://git-wip-us.apache.org/repos/asf/hive/blob/cec61d94/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
index 452cb15..1a7cfae 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
@@ -97,7 +97,7 @@ public class HiveEndPoint {
/**
- * @deprecated Use {@link #newConnection(boolean, String)}
+ * @deprecated As of release 1.3/2.1. Replaced by {@link #newConnection(boolean, String)}
*/
public StreamingConnection newConnection(final boolean createPartIfNotExists)
throws ConnectionError, InvalidPartition, InvalidTable, PartitionCreationFailed
@@ -105,7 +105,7 @@ public class HiveEndPoint {
return newConnection(createPartIfNotExists, null, null, null);
}
/**
- * @deprecated Use {@link #newConnection(boolean, HiveConf, String)}
+ * @deprecated As of release 1.3/2.1. Replaced by {@link #newConnection(boolean, HiveConf, String)}
*/
public StreamingConnection newConnection(final boolean createPartIfNotExists, HiveConf conf)
throws ConnectionError, InvalidPartition, InvalidTable, PartitionCreationFailed
@@ -113,7 +113,7 @@ public class HiveEndPoint {
return newConnection(createPartIfNotExists, conf, null, null);
}
/**
- * @deprecated Use {@link #newConnection(boolean, HiveConf, UserGroupInformation, String)}
+ * @deprecated As of release 1.3/2.1. Replaced by {@link #newConnection(boolean, HiveConf, UserGroupInformation, String)}
*/
public StreamingConnection newConnection(final boolean createPartIfNotExists, final HiveConf conf,
final UserGroupInformation authenticatedUser)
@@ -395,6 +395,10 @@ public class HiveEndPoint {
}
}
+ @Override
+ public UserGroupInformation getUserGroupInformation() {
+ return ugi;
+ }
/**
* Acquires a new batch of transactions from Hive.
http://git-wip-us.apache.org/repos/asf/hive/blob/cec61d94/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StreamingConnection.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StreamingConnection.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StreamingConnection.java
index 25acff0..8785a21 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StreamingConnection.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StreamingConnection.java
@@ -18,6 +18,8 @@
package org.apache.hive.hcatalog.streaming;
+import org.apache.hadoop.security.UserGroupInformation;
+
/**
* Represents a connection to a HiveEndPoint. Used to acquire transaction batches.
*/
@@ -46,4 +48,8 @@ public interface StreamingConnection {
*/
public void close();
+ /**
+ * @return UserGroupInformation associated with this connection or {@code null} if there is none
+ */
+ UserGroupInformation getUserGroupInformation();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/cec61d94/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java
index db73d6b..1facad1 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java
@@ -46,28 +46,40 @@ public class StrictJsonWriter extends AbstractRecordWriter {
private final StructField[] bucketStructFields;
/**
- *
+ * @deprecated As of release 1.3/2.1. Replaced by {@link #StrictJsonWriter(HiveEndPoint, HiveConf, StreamingConnection)}
+ */
+ public StrictJsonWriter(HiveEndPoint endPoint)
+ throws ConnectionError, SerializationError, StreamingException {
+ this(endPoint, null, null);
+ }
+
+ /**
+ * @deprecated As of release 1.3/2.1. Replaced by {@link #StrictJsonWriter(HiveEndPoint, HiveConf, StreamingConnection)}
+ */
+ public StrictJsonWriter(HiveEndPoint endPoint, HiveConf conf) throws StreamingException {
+ this(endPoint, conf, null);
+ }
+ /**
* @param endPoint the end point to write to
* @throws ConnectionError
* @throws SerializationError
* @throws StreamingException
*/
- public StrictJsonWriter(HiveEndPoint endPoint)
+ public StrictJsonWriter(HiveEndPoint endPoint, StreamingConnection conn)
throws ConnectionError, SerializationError, StreamingException {
- this(endPoint, null);
+ this(endPoint, null, conn);
}
-
/**
- *
* @param endPoint the end point to write to
* @param conf a Hive conf object. Should be null if not using advanced Hive settings.
+ * @param conn connection this Writer is to be used with
* @throws ConnectionError
* @throws SerializationError
* @throws StreamingException
*/
- public StrictJsonWriter(HiveEndPoint endPoint, HiveConf conf)
+ public StrictJsonWriter(HiveEndPoint endPoint, HiveConf conf, StreamingConnection conn)
throws ConnectionError, SerializationError, StreamingException {
- super(endPoint, conf);
+ super(endPoint, conf, conn);
this.serde = createSerde(tbl, conf);
// get ObjInspectors for entire record and bucketed cols
try {
http://git-wip-us.apache.org/repos/asf/hive/blob/cec61d94/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
index 84e559d..dedfe3f 100644
--- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
@@ -64,6 +64,8 @@ import org.apache.hadoop.hive.ql.CommandNeedRetryException;
import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.IOConstants;
+import org.apache.hadoop.hive.shims.Utils;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.orc.impl.OrcAcidUtils;
import org.apache.orc.tools.FileDump;
import org.apache.hadoop.hive.ql.io.orc.OrcFile;
@@ -300,11 +302,11 @@ public class TestStreaming {
List<String> partitionVals = new ArrayList<String>();
partitionVals.add("2015");
HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "testing5", "store_sales", partitionVals);
+ StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName());
DelimitedInputWriter writer = new DelimitedInputWriter(new String[] {"ss_sold_date_sk","ss_sold_time_sk", "ss_item_sk",
"ss_customer_sk", "ss_cdemo_sk", "ss_hdemo_sk", "ss_addr_sk", "ss_store_sk", "ss_promo_sk", "ss_ticket_number", "ss_quantity",
"ss_wholesale_cost", "ss_list_price", "ss_sales_price", "ss_ext_discount_amt", "ss_ext_sales_price", "ss_ext_wholesale_cost",
- "ss_ext_list_price", "ss_ext_tax", "ss_coupon_amt", "ss_net_paid", "ss_net_paid_inc_tax", "ss_net_profit"},",", endPt);
- StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName());
+ "ss_ext_list_price", "ss_ext_tax", "ss_coupon_amt", "ss_net_paid", "ss_net_paid_inc_tax", "ss_net_profit"},",", endPt, connection);
TransactionBatch txnBatch = connection.fetchTransactionBatch(2, writer);
txnBatch.beginNextTransaction();
@@ -562,8 +564,8 @@ public class TestStreaming {
// 1) to partitioned table
HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
partitionVals);
- DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt);
StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName());
+ DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt, connection);
TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer);
@@ -641,8 +643,8 @@ public class TestStreaming {
@Test
public void testHeartbeat() throws Exception {
HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null);
- DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames2,",", endPt);
StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName());
+ DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames2,",", endPt, connection);
TransactionBatch txnBatch = connection.fetchTransactionBatch(5, writer);
txnBatch.beginNextTransaction();
@@ -670,8 +672,8 @@ public class TestStreaming {
// 1) to partitioned table
HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
partitionVals);
- DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt);
StreamingConnection connection = endPt.newConnection(true, "UT_" + Thread.currentThread().getName());
+ DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt, connection);
TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer);
txnBatch.beginNextTransaction();
@@ -697,28 +699,35 @@ public class TestStreaming {
@Test
public void testTransactionBatchCommit_Delimited() throws Exception {
+ testTransactionBatchCommit_Delimited(null);
+ }
+ @Test
+ public void testTransactionBatchCommit_DelimitedUGI() throws Exception {
+ testTransactionBatchCommit_Delimited(Utils.getUGI());
+ }
+ private void testTransactionBatchCommit_Delimited(UserGroupInformation ugi) throws Exception {
HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
- partitionVals);
- DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt);
- StreamingConnection connection = endPt.newConnection(true, "UT_" + Thread.currentThread().getName());
+ partitionVals);
+ StreamingConnection connection = endPt.newConnection(true, conf, ugi, "UT_" + Thread.currentThread().getName());
+ DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt, conf, connection);
// 1st Txn
TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer);
txnBatch.beginNextTransaction();
Assert.assertEquals(TransactionBatch.TxnState.OPEN
- , txnBatch.getCurrentTransactionState());
+ , txnBatch.getCurrentTransactionState());
txnBatch.write("1,Hello streaming".getBytes());
txnBatch.commit();
checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
- , txnBatch.getCurrentTransactionState());
+ , txnBatch.getCurrentTransactionState());
// 2nd Txn
txnBatch.beginNextTransaction();
Assert.assertEquals(TransactionBatch.TxnState.OPEN
- , txnBatch.getCurrentTransactionState());
+ , txnBatch.getCurrentTransactionState());
txnBatch.write("2,Welcome to streaming".getBytes());
// data should not be visible
@@ -727,11 +736,11 @@ public class TestStreaming {
txnBatch.commit();
checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}",
- "{2, Welcome to streaming}");
+ "{2, Welcome to streaming}");
txnBatch.close();
Assert.assertEquals(TransactionBatch.TxnState.INACTIVE
- , txnBatch.getCurrentTransactionState());
+ , txnBatch.getCurrentTransactionState());
connection.close();
@@ -739,19 +748,19 @@ public class TestStreaming {
// To Unpartitioned table
endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null);
- writer = new DelimitedInputWriter(fieldNames,",", endPt);
- connection = endPt.newConnection(true, "UT_" + Thread.currentThread().getName());
+ connection = endPt.newConnection(true, conf, ugi, "UT_" + Thread.currentThread().getName());
+ writer = new DelimitedInputWriter(fieldNames,",", endPt, conf, connection);
// 1st Txn
txnBatch = connection.fetchTransactionBatch(10, writer);
txnBatch.beginNextTransaction();
Assert.assertEquals(TransactionBatch.TxnState.OPEN
- , txnBatch.getCurrentTransactionState());
+ , txnBatch.getCurrentTransactionState());
txnBatch.write("1,Hello streaming".getBytes());
txnBatch.commit();
Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
- , txnBatch.getCurrentTransactionState());
+ , txnBatch.getCurrentTransactionState());
connection.close();
}
@@ -759,8 +768,8 @@ public class TestStreaming {
public void testTransactionBatchCommit_Json() throws Exception {
HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
partitionVals);
- StrictJsonWriter writer = new StrictJsonWriter(endPt);
StreamingConnection connection = endPt.newConnection(true, "UT_" + Thread.currentThread().getName());
+ StrictJsonWriter writer = new StrictJsonWriter(endPt, connection);
// 1st Txn
TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer);
@@ -844,8 +853,8 @@ public class TestStreaming {
HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
partitionVals);
- DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt);
StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName());
+ DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt, connection);
TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer);
@@ -872,8 +881,8 @@ public class TestStreaming {
String agentInfo = "UT_" + Thread.currentThread().getName();
HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
partitionVals);
- DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt);
StreamingConnection connection = endPt.newConnection(false, agentInfo);
+ DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt, connection);
TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer);
txnBatch.beginNextTransaction();
@@ -1173,8 +1182,8 @@ public class TestStreaming {
// 2) Insert data into both tables
HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName3, tblName3, null);
- DelimitedInputWriter writer = new DelimitedInputWriter(colNames,",", endPt);
StreamingConnection connection = endPt.newConnection(false, agentInfo);
+ DelimitedInputWriter writer = new DelimitedInputWriter(colNames,",", endPt, connection);
TransactionBatch txnBatch = connection.fetchTransactionBatch(2, writer);
txnBatch.beginNextTransaction();
@@ -1186,8 +1195,8 @@ public class TestStreaming {
HiveEndPoint endPt2 = new HiveEndPoint(metaStoreURI, dbName4, tblName4, null);
- DelimitedInputWriter writer2 = new DelimitedInputWriter(colNames2,",", endPt2);
StreamingConnection connection2 = endPt2.newConnection(false, agentInfo);
+ DelimitedInputWriter writer2 = new DelimitedInputWriter(colNames2,",", endPt2, connection);
TransactionBatch txnBatch2 = connection2.fetchTransactionBatch(2, writer2);
txnBatch2.beginNextTransaction();
@@ -1250,8 +1259,8 @@ public class TestStreaming {
// 2) Insert data into both tables
HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName3, tblName3, null);
- DelimitedInputWriter writer = new DelimitedInputWriter(colNames,",", endPt);
StreamingConnection connection = endPt.newConnection(false, agentInfo);
+ DelimitedInputWriter writer = new DelimitedInputWriter(colNames,",", endPt, connection);
TransactionBatch txnBatch = connection.fetchTransactionBatch(2, writer);
txnBatch.beginNextTransaction();
@@ -1322,8 +1331,8 @@ public class TestStreaming {
// 2) Insert data into both tables
HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName3, tblName3, null);
- DelimitedInputWriter writer = new DelimitedInputWriter(colNames,",", endPt);
StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName());
+ DelimitedInputWriter writer = new DelimitedInputWriter(colNames,",", endPt, connection);
// we need side file for this test, so we create 2 txn batch and test with only one
TransactionBatch txnBatch = connection.fetchTransactionBatch(2, writer);
@@ -1448,8 +1457,8 @@ public class TestStreaming {
// 2) Insert data into both tables
HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName3, tblName3, null);
- DelimitedInputWriter writer = new DelimitedInputWriter(colNames,",", endPt);
StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName());
+ DelimitedInputWriter writer = new DelimitedInputWriter(colNames,",", endPt, connection);
TransactionBatch txnBatch = connection.fetchTransactionBatch(2, writer);
txnBatch.beginNextTransaction();
@@ -1670,9 +1679,9 @@ public class TestStreaming {
runCmdOnDriver("create table T(a int, b int) clustered by (b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "testErrors", "T", null);
- DelimitedInputWriter innerWriter = new DelimitedInputWriter("a,b".split(","),",", endPt);
- FaultyWriter writer = new FaultyWriter(innerWriter);
StreamingConnection connection = endPt.newConnection(false, agentInfo);
+ DelimitedInputWriter innerWriter = new DelimitedInputWriter("a,b".split(","),",", endPt, connection);
+ FaultyWriter writer = new FaultyWriter(innerWriter);
TransactionBatch txnBatch = connection.fetchTransactionBatch(2, writer);
txnBatch.close();