You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2016/07/09 02:04:11 UTC

[2/2] hive git commit: HIVE-14114 Ensure RecordWriter in streaming API is using the same UserGroupInformation as StreamingConnection (Eugene Koifman, reviewed by Wei Zheng)

HIVE-14114 Ensure RecordWriter in streaming API is using the same UserGroupInformation as StreamingConnection (Eugene Koifman, reviewed by Wei Zheng)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/cec61d94
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/cec61d94
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/cec61d94

Branch: refs/heads/branch-2.1
Commit: cec61d945e7949f3585e88d777f0687664fcb85e
Parents: c5a9b0c
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Fri Jul 8 18:34:08 2016 -0700
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Fri Jul 8 18:34:08 2016 -0700

----------------------------------------------------------------------
 .../streaming/AbstractRecordWriter.java         | 61 +++++++++++++------
 .../streaming/DelimitedInputWriter.java         | 47 +++++++++++----
 .../hive/hcatalog/streaming/HiveEndPoint.java   | 10 +++-
 .../hcatalog/streaming/StreamingConnection.java |  6 ++
 .../hcatalog/streaming/StrictJsonWriter.java    | 26 +++++---
 .../hive/hcatalog/streaming/TestStreaming.java  | 63 +++++++++++---------
 6 files changed, 149 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/cec61d94/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
index 0c6b9ea..974c6b8 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
@@ -19,6 +19,7 @@
 package org.apache.hive.hcatalog.streaming;
 
 
+import org.apache.hadoop.security.UserGroupInformation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.fs.Path;
@@ -43,6 +44,7 @@ import org.apache.thrift.TException;
 
 import java.io.IOException;
 
+import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
@@ -68,34 +70,59 @@ public abstract class AbstractRecordWriter implements RecordWriter {
   private Long curBatchMinTxnId;
   private Long curBatchMaxTxnId;
 
+  private static final class TableWriterPair {
+    private final Table tbl;
+    private final Path partitionPath;
+    TableWriterPair(Table t, Path p) {
+      tbl = t;
+      partitionPath = p;
+    }
+  }
+  /**
+   * @deprecated As of release 1.3/2.1.  Replaced by {@link #AbstractRecordWriter(HiveEndPoint, HiveConf, StreamingConnection)}
+   */
   protected AbstractRecordWriter(HiveEndPoint endPoint, HiveConf conf)
-          throws ConnectionError, StreamingException {
-    this.endPoint = endPoint;
+    throws ConnectionError, StreamingException {
+    this(endPoint, conf, null);
+  }
+  protected AbstractRecordWriter(HiveEndPoint endPoint2, HiveConf conf, StreamingConnection conn)
+          throws StreamingException {
+    this.endPoint = endPoint2;
     this.conf = conf!=null ? conf
                 : HiveEndPoint.createHiveConf(DelimitedInputWriter.class, endPoint.metaStoreUri);
     try {
       msClient = HCatUtil.getHiveMetastoreClient(this.conf);
-      this.tbl = msClient.getTable(endPoint.database, endPoint.table);
-      this.partitionPath = getPathForEndPoint(msClient, endPoint);
+      UserGroupInformation ugi = conn != null ? conn.getUserGroupInformation() : null;
+      if (ugi == null) {
+        this.tbl = msClient.getTable(endPoint.database, endPoint.table);
+        this.partitionPath = getPathForEndPoint(msClient, endPoint);
+      } else {
+        TableWriterPair twp = ugi.doAs(
+          new PrivilegedExceptionAction<TableWriterPair>() {
+            @Override
+            public TableWriterPair run() throws Exception {
+              return new TableWriterPair(msClient.getTable(endPoint.database, endPoint.table),
+                getPathForEndPoint(msClient, endPoint));
+            }
+          });
+        this.tbl = twp.tbl;
+        this.partitionPath = twp.partitionPath;
+      }
       this.totalBuckets = tbl.getSd().getNumBuckets();
-      if(totalBuckets <= 0) {
+      if (totalBuckets <= 0) {
         throw new StreamingException("Cannot stream to table that has not been bucketed : "
-                + endPoint);
+          + endPoint);
       }
-      this.bucketIds = getBucketColIDs(tbl.getSd().getBucketCols(), tbl.getSd().getCols()) ;
+      this.bucketIds = getBucketColIDs(tbl.getSd().getBucketCols(), tbl.getSd().getCols());
       this.bucketFieldData = new Object[bucketIds.size()];
       String outFormatName = this.tbl.getSd().getOutputFormat();
-      outf = (AcidOutputFormat<?,?>) ReflectionUtils.newInstance(JavaUtils.loadClass(outFormatName), conf);
+      outf = (AcidOutputFormat<?, ?>) ReflectionUtils.newInstance(JavaUtils.loadClass(outFormatName), conf);
       bucketFieldData = new Object[bucketIds.size()];
-    } catch (MetaException e) {
-      throw new ConnectionError(endPoint, e);
-    } catch (NoSuchObjectException e) {
-      throw new ConnectionError(endPoint, e);
-    } catch (TException e) {
-      throw new StreamingException(e.getMessage(), e);
-    } catch (ClassNotFoundException e) {
-      throw new StreamingException(e.getMessage(), e);
-    } catch (IOException e) {
+    } catch(InterruptedException e) {
+      throw new StreamingException(endPoint2.toString(), e);
+    } catch (MetaException | NoSuchObjectException e) {
+      throw new ConnectionError(endPoint2, e);
+    } catch (TException | ClassNotFoundException | IOException e) {
       throw new StreamingException(e.getMessage(), e);
     }
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/cec61d94/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java
index 394cc54..7ab2fc6 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java
@@ -20,6 +20,7 @@ package org.apache.hive.hcatalog.streaming;
 
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -73,12 +74,11 @@ public class DelimitedInputWriter extends AbstractRecordWriter {
    * @throws InvalidColumn any element in colNamesForFields refers to a non existing column
    */
   public DelimitedInputWriter(String[] colNamesForFields, String delimiter,
-                              HiveEndPoint endPoint)
-          throws ClassNotFoundException, ConnectionError, SerializationError,
-                 InvalidColumn, StreamingException {
-    this(colNamesForFields, delimiter, endPoint, null);
+                              HiveEndPoint endPoint, StreamingConnection conn)
+    throws ClassNotFoundException, ConnectionError, SerializationError,
+      InvalidColumn, StreamingException {
+    this(colNamesForFields, delimiter, endPoint, null, conn);
   }
-
  /** Constructor. Uses default separator of the LazySimpleSerde
   * @param colNamesForFields Column name assignment for input fields. nulls or empty
   *                          strings in the array indicates the fields to be skipped
@@ -92,13 +92,12 @@ public class DelimitedInputWriter extends AbstractRecordWriter {
   * @throws InvalidColumn any element in colNamesForFields refers to a non existing column
   */
    public DelimitedInputWriter(String[] colNamesForFields, String delimiter,
-                              HiveEndPoint endPoint, HiveConf conf)
+                              HiveEndPoint endPoint, HiveConf conf, StreamingConnection conn)
           throws ClassNotFoundException, ConnectionError, SerializationError,
                  InvalidColumn, StreamingException {
      this(colNamesForFields, delimiter, endPoint, conf,
-             (char) LazySerDeParameters.DefaultSeparators[0]);
+       (char) LazySerDeParameters.DefaultSeparators[0], conn);
    }
-
   /**
    * Constructor. Allows overriding separator of the LazySimpleSerde
    * @param colNamesForFields Column name assignment for input fields
@@ -108,6 +107,7 @@ public class DelimitedInputWriter extends AbstractRecordWriter {
    * @param serdeSeparator separator used when encoding data that is fed into the
    *                             LazySimpleSerde. Ensure this separator does not occur
    *                             in the field data
+   * @param conn connection this Writer is to be used with
    * @throws ConnectionError Problem talking to Hive
    * @throws ClassNotFoundException Serde class not found
    * @throws SerializationError Serde initialization/interaction failed
@@ -115,10 +115,10 @@ public class DelimitedInputWriter extends AbstractRecordWriter {
    * @throws InvalidColumn any element in colNamesForFields refers to a non existing column
    */
   public DelimitedInputWriter(String[] colNamesForFields, String delimiter,
-                              HiveEndPoint endPoint, HiveConf conf, char serdeSeparator)
+                              HiveEndPoint endPoint, HiveConf conf, char serdeSeparator, StreamingConnection conn)
           throws ClassNotFoundException, ConnectionError, SerializationError,
                  InvalidColumn, StreamingException {
-    super(endPoint, conf);
+    super(endPoint, conf, conn);
     this.tableColumns = getCols(tbl);
     this.serdeSeparator = serdeSeparator;
     this.delimiter = delimiter;
@@ -143,6 +143,33 @@ public class DelimitedInputWriter extends AbstractRecordWriter {
       bucketStructFields[i] = allFields.get(bucketIds.get(i));
     }
   }
+  /**
+   * @deprecated As of release 1.3/2.1.  Replaced by {@link #DelimitedInputWriter(String[], String, HiveEndPoint, StreamingConnection)}
+   */
+  public DelimitedInputWriter(String[] colNamesForFields, String delimiter,
+                              HiveEndPoint endPoint)
+    throws ClassNotFoundException, ConnectionError, SerializationError,
+    InvalidColumn, StreamingException {
+    this(colNamesForFields, delimiter, endPoint, null, null);
+  }
+  /**
+   * @deprecated As of release 1.3/2.1.  Replaced by {@link #DelimitedInputWriter(String[], String, HiveEndPoint, HiveConf, StreamingConnection)}
+   */
+  public DelimitedInputWriter(String[] colNamesForFields, String delimiter,
+                              HiveEndPoint endPoint, HiveConf conf)
+    throws ClassNotFoundException, ConnectionError, SerializationError,
+    InvalidColumn, StreamingException {
+    this(colNamesForFields, delimiter, endPoint, conf,
+      (char) LazySerDeParameters.DefaultSeparators[0], null);
+  }
+  /**
+   * @deprecated As of release 1.3/2.1.  Replaced by {@link #DelimitedInputWriter(String[], String, HiveEndPoint, HiveConf, char, StreamingConnection)}
+   */
+  public DelimitedInputWriter(String[] colNamesForFields, String delimiter,
+                              HiveEndPoint endPoint, HiveConf conf, char serdeSeparator)
+    throws ClassNotFoundException, StreamingException {
+    this(colNamesForFields, delimiter, endPoint, conf, serdeSeparator, null);
+  }
 
   private boolean isReorderingNeeded(String delimiter, ArrayList<String> tableColumns) {
     return !( delimiter.equals(String.valueOf(getSerdeSeparator()))

http://git-wip-us.apache.org/repos/asf/hive/blob/cec61d94/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
index 452cb15..1a7cfae 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
@@ -97,7 +97,7 @@ public class HiveEndPoint {
 
 
   /**
-   * @deprecated Use {@link #newConnection(boolean, String)}
+   * @deprecated As of release 1.3/2.1.  Replaced by {@link #newConnection(boolean, String)}
    */
   public StreamingConnection newConnection(final boolean createPartIfNotExists)
     throws ConnectionError, InvalidPartition, InvalidTable, PartitionCreationFailed
@@ -105,7 +105,7 @@ public class HiveEndPoint {
     return newConnection(createPartIfNotExists, null, null, null);
   }
   /**
-   * @deprecated Use {@link #newConnection(boolean, HiveConf, String)}
+   * @deprecated As of release 1.3/2.1.  Replaced by {@link #newConnection(boolean, HiveConf, String)}
    */
   public StreamingConnection newConnection(final boolean createPartIfNotExists, HiveConf conf)
     throws ConnectionError, InvalidPartition, InvalidTable, PartitionCreationFailed
@@ -113,7 +113,7 @@ public class HiveEndPoint {
     return newConnection(createPartIfNotExists, conf, null, null);
   }
   /**
-   * @deprecated Use {@link #newConnection(boolean, HiveConf, UserGroupInformation, String)}
+   * @deprecated As of release 1.3/2.1.  Replaced by {@link #newConnection(boolean, HiveConf, UserGroupInformation, String)}
    */
   public StreamingConnection newConnection(final boolean createPartIfNotExists, final HiveConf conf,
                                            final UserGroupInformation authenticatedUser)
@@ -395,6 +395,10 @@ public class HiveEndPoint {
       }
     }
 
+    @Override
+    public UserGroupInformation getUserGroupInformation() {
+      return ugi;
+    }
 
     /**
      * Acquires a new batch of transactions from Hive.

http://git-wip-us.apache.org/repos/asf/hive/blob/cec61d94/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StreamingConnection.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StreamingConnection.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StreamingConnection.java
index 25acff0..8785a21 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StreamingConnection.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StreamingConnection.java
@@ -18,6 +18,8 @@
 
 package org.apache.hive.hcatalog.streaming;
 
+import org.apache.hadoop.security.UserGroupInformation;
+
 /**
  * Represents a connection to a HiveEndPoint. Used to acquire transaction batches.
  */
@@ -46,4 +48,8 @@ public interface StreamingConnection {
    */
   public void close();
 
+  /**
+   * @return UserGroupInformation associated with this connection or {@code null} if there is none
+   */
+  UserGroupInformation getUserGroupInformation();
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/cec61d94/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java
index db73d6b..1facad1 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java
@@ -46,28 +46,40 @@ public class StrictJsonWriter extends AbstractRecordWriter {
   private final StructField[] bucketStructFields;
 
   /**
-   *
+   * @deprecated As of release 1.3/2.1.  Replaced by {@link #StrictJsonWriter(HiveEndPoint, HiveConf, StreamingConnection)}
+   */
+  public StrictJsonWriter(HiveEndPoint endPoint)
+    throws ConnectionError, SerializationError, StreamingException {
+    this(endPoint, null, null);
+  }
+
+  /**
+   * @deprecated As of release 1.3/2.1.  Replaced by {@link #StrictJsonWriter(HiveEndPoint, HiveConf, StreamingConnection)}
+   */
+  public StrictJsonWriter(HiveEndPoint endPoint, HiveConf conf) throws StreamingException {
+    this(endPoint, conf, null);
+  }
+  /**
    * @param endPoint the end point to write to
    * @throws ConnectionError
    * @throws SerializationError
    * @throws StreamingException
    */
-  public StrictJsonWriter(HiveEndPoint endPoint)
+  public StrictJsonWriter(HiveEndPoint endPoint, StreamingConnection conn)
           throws ConnectionError, SerializationError, StreamingException {
-    this(endPoint, null);
+    this(endPoint, null, conn);
   }
-
   /**
-   *
    * @param endPoint the end point to write to
    * @param conf a Hive conf object. Should be null if not using advanced Hive settings.
+   * @param conn connection this Writer is to be used with
    * @throws ConnectionError
    * @throws SerializationError
    * @throws StreamingException
    */
-  public StrictJsonWriter(HiveEndPoint endPoint, HiveConf conf)
+  public StrictJsonWriter(HiveEndPoint endPoint, HiveConf conf, StreamingConnection conn)
           throws ConnectionError, SerializationError, StreamingException {
-    super(endPoint, conf);
+    super(endPoint, conf, conn);
     this.serde = createSerde(tbl, conf);
     // get ObjInspectors for entire record and bucketed cols
     try {

http://git-wip-us.apache.org/repos/asf/hive/blob/cec61d94/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
index 84e559d..dedfe3f 100644
--- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
@@ -64,6 +64,8 @@ import org.apache.hadoop.hive.ql.CommandNeedRetryException;
 import org.apache.hadoop.hive.ql.Driver;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.IOConstants;
+import org.apache.hadoop.hive.shims.Utils;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.orc.impl.OrcAcidUtils;
 import org.apache.orc.tools.FileDump;
 import org.apache.hadoop.hive.ql.io.orc.OrcFile;
@@ -300,11 +302,11 @@ public class TestStreaming {
     List<String> partitionVals = new ArrayList<String>();
     partitionVals.add("2015");
     HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "testing5", "store_sales", partitionVals);
+    StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName());
     DelimitedInputWriter writer = new DelimitedInputWriter(new String[] {"ss_sold_date_sk","ss_sold_time_sk", "ss_item_sk",
       "ss_customer_sk", "ss_cdemo_sk", "ss_hdemo_sk", "ss_addr_sk", "ss_store_sk", "ss_promo_sk", "ss_ticket_number", "ss_quantity",
       "ss_wholesale_cost", "ss_list_price", "ss_sales_price", "ss_ext_discount_amt", "ss_ext_sales_price", "ss_ext_wholesale_cost",
-      "ss_ext_list_price", "ss_ext_tax", "ss_coupon_amt", "ss_net_paid", "ss_net_paid_inc_tax", "ss_net_profit"},",", endPt);
-    StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName());
+      "ss_ext_list_price", "ss_ext_tax", "ss_coupon_amt", "ss_net_paid", "ss_net_paid_inc_tax", "ss_net_profit"},",", endPt, connection);
 
     TransactionBatch txnBatch =  connection.fetchTransactionBatch(2, writer);
     txnBatch.beginNextTransaction();
@@ -562,8 +564,8 @@ public class TestStreaming {
     // 1)  to partitioned table
     HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
             partitionVals);
-    DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt);
     StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName());
+    DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt, connection);
 
     TransactionBatch txnBatch =  connection.fetchTransactionBatch(10, writer);
 
@@ -641,8 +643,8 @@ public class TestStreaming {
   @Test
   public void testHeartbeat() throws Exception {
     HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null);
-    DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames2,",", endPt);
     StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName());
+    DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames2,",", endPt, connection);
 
     TransactionBatch txnBatch =  connection.fetchTransactionBatch(5, writer);
     txnBatch.beginNextTransaction();
@@ -670,8 +672,8 @@ public class TestStreaming {
     // 1) to partitioned table
     HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
             partitionVals);
-    DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt);
     StreamingConnection connection = endPt.newConnection(true, "UT_" + Thread.currentThread().getName());
+    DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt, connection);
 
     TransactionBatch txnBatch =  connection.fetchTransactionBatch(10, writer);
     txnBatch.beginNextTransaction();
@@ -697,28 +699,35 @@ public class TestStreaming {
 
   @Test
   public void testTransactionBatchCommit_Delimited() throws Exception {
+    testTransactionBatchCommit_Delimited(null);
+  }
+  @Test
+  public void testTransactionBatchCommit_DelimitedUGI() throws Exception {
+    testTransactionBatchCommit_Delimited(Utils.getUGI());
+  }
+  private void testTransactionBatchCommit_Delimited(UserGroupInformation ugi) throws Exception {
     HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
-            partitionVals);
-    DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt);
-    StreamingConnection connection = endPt.newConnection(true, "UT_" + Thread.currentThread().getName());
+      partitionVals);
+    StreamingConnection connection = endPt.newConnection(true, conf, ugi, "UT_" + Thread.currentThread().getName());
+    DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt, conf, connection);
 
     // 1st Txn
     TransactionBatch txnBatch =  connection.fetchTransactionBatch(10, writer);
     txnBatch.beginNextTransaction();
     Assert.assertEquals(TransactionBatch.TxnState.OPEN
-            , txnBatch.getCurrentTransactionState());
+      , txnBatch.getCurrentTransactionState());
     txnBatch.write("1,Hello streaming".getBytes());
     txnBatch.commit();
 
     checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
 
     Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
-            , txnBatch.getCurrentTransactionState());
+      , txnBatch.getCurrentTransactionState());
 
     // 2nd Txn
     txnBatch.beginNextTransaction();
     Assert.assertEquals(TransactionBatch.TxnState.OPEN
-            , txnBatch.getCurrentTransactionState());
+      , txnBatch.getCurrentTransactionState());
     txnBatch.write("2,Welcome to streaming".getBytes());
 
     // data should not be visible
@@ -727,11 +736,11 @@ public class TestStreaming {
     txnBatch.commit();
 
     checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}",
-        "{2, Welcome to streaming}");
+      "{2, Welcome to streaming}");
 
     txnBatch.close();
     Assert.assertEquals(TransactionBatch.TxnState.INACTIVE
-            , txnBatch.getCurrentTransactionState());
+      , txnBatch.getCurrentTransactionState());
 
 
     connection.close();
@@ -739,19 +748,19 @@ public class TestStreaming {
 
     // To Unpartitioned table
     endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null);
-    writer = new DelimitedInputWriter(fieldNames,",", endPt);
-    connection = endPt.newConnection(true, "UT_" + Thread.currentThread().getName());
+    connection = endPt.newConnection(true, conf, ugi, "UT_" + Thread.currentThread().getName());
+    writer = new DelimitedInputWriter(fieldNames,",", endPt, conf, connection);
 
     // 1st Txn
     txnBatch =  connection.fetchTransactionBatch(10, writer);
     txnBatch.beginNextTransaction();
     Assert.assertEquals(TransactionBatch.TxnState.OPEN
-            , txnBatch.getCurrentTransactionState());
+      , txnBatch.getCurrentTransactionState());
     txnBatch.write("1,Hello streaming".getBytes());
     txnBatch.commit();
 
     Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
-            , txnBatch.getCurrentTransactionState());
+      , txnBatch.getCurrentTransactionState());
     connection.close();
   }
 
@@ -759,8 +768,8 @@ public class TestStreaming {
   public void testTransactionBatchCommit_Json() throws Exception {
     HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
             partitionVals);
-    StrictJsonWriter writer = new StrictJsonWriter(endPt);
     StreamingConnection connection = endPt.newConnection(true, "UT_" + Thread.currentThread().getName());
+    StrictJsonWriter writer = new StrictJsonWriter(endPt, connection);
 
     // 1st Txn
     TransactionBatch txnBatch =  connection.fetchTransactionBatch(10, writer);
@@ -844,8 +853,8 @@ public class TestStreaming {
 
     HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
             partitionVals);
-    DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt);
     StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName());
+    DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt, connection);
 
 
     TransactionBatch txnBatch =  connection.fetchTransactionBatch(10, writer);
@@ -872,8 +881,8 @@ public class TestStreaming {
     String agentInfo = "UT_" + Thread.currentThread().getName();
     HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
             partitionVals);
-    DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt);
     StreamingConnection connection = endPt.newConnection(false, agentInfo);
+    DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt, connection);
 
     TransactionBatch txnBatch =  connection.fetchTransactionBatch(10, writer);
     txnBatch.beginNextTransaction();
@@ -1173,8 +1182,8 @@ public class TestStreaming {
 
     // 2) Insert data into both tables
     HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName3, tblName3, null);
-    DelimitedInputWriter writer = new DelimitedInputWriter(colNames,",", endPt);
     StreamingConnection connection = endPt.newConnection(false, agentInfo);
+    DelimitedInputWriter writer = new DelimitedInputWriter(colNames,",", endPt, connection);
 
     TransactionBatch txnBatch =  connection.fetchTransactionBatch(2, writer);
     txnBatch.beginNextTransaction();
@@ -1186,8 +1195,8 @@ public class TestStreaming {
 
 
     HiveEndPoint endPt2 = new HiveEndPoint(metaStoreURI, dbName4, tblName4, null);
-    DelimitedInputWriter writer2 = new DelimitedInputWriter(colNames2,",", endPt2);
     StreamingConnection connection2 = endPt2.newConnection(false, agentInfo);
+    DelimitedInputWriter writer2 = new DelimitedInputWriter(colNames2,",", endPt2, connection);
     TransactionBatch txnBatch2 =  connection2.fetchTransactionBatch(2, writer2);
     txnBatch2.beginNextTransaction();
 
@@ -1250,8 +1259,8 @@ public class TestStreaming {
 
     // 2) Insert data into both tables
     HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName3, tblName3, null);
-    DelimitedInputWriter writer = new DelimitedInputWriter(colNames,",", endPt);
     StreamingConnection connection = endPt.newConnection(false, agentInfo);
+    DelimitedInputWriter writer = new DelimitedInputWriter(colNames,",", endPt, connection);
 
     TransactionBatch txnBatch =  connection.fetchTransactionBatch(2, writer);
     txnBatch.beginNextTransaction();
@@ -1322,8 +1331,8 @@ public class TestStreaming {
 
     // 2) Insert data into both tables
     HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName3, tblName3, null);
-    DelimitedInputWriter writer = new DelimitedInputWriter(colNames,",", endPt);
     StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName());
+    DelimitedInputWriter writer = new DelimitedInputWriter(colNames,",", endPt, connection);
 
     // we need side file for this test, so we create 2 txn batch and test with only one
     TransactionBatch txnBatch =  connection.fetchTransactionBatch(2, writer);
@@ -1448,8 +1457,8 @@ public class TestStreaming {
 
     // 2) Insert data into both tables
     HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName3, tblName3, null);
-    DelimitedInputWriter writer = new DelimitedInputWriter(colNames,",", endPt);
     StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName());
+    DelimitedInputWriter writer = new DelimitedInputWriter(colNames,",", endPt, connection);
 
     TransactionBatch txnBatch =  connection.fetchTransactionBatch(2, writer);
     txnBatch.beginNextTransaction();
@@ -1670,9 +1679,9 @@ public class TestStreaming {
     runCmdOnDriver("create table T(a int, b int) clustered by (b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
 
     HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "testErrors", "T", null);
-    DelimitedInputWriter innerWriter = new DelimitedInputWriter("a,b".split(","),",", endPt);
-    FaultyWriter writer = new FaultyWriter(innerWriter);
     StreamingConnection connection = endPt.newConnection(false, agentInfo);
+    DelimitedInputWriter innerWriter = new DelimitedInputWriter("a,b".split(","),",", endPt, connection);
+    FaultyWriter writer = new FaultyWriter(innerWriter);
 
     TransactionBatch txnBatch =  connection.fetchTransactionBatch(2, writer);
     txnBatch.close();