You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by vg...@apache.org on 2016/04/22 21:23:52 UTC

[3/3] hive git commit: HIVE-12049: HiveServer2: Provide an option to write serialized thrift objects in final tasks (Rohit Dholakia reviewed by Ashutosh Chauhan, Gopal Vijayaraghavan, Lefty Leverenz, Vaibhav Gumashta)

HIVE-12049: HiveServer2: Provide an option to write serialized thrift objects in final tasks (Rohit Dholakia reviewed by Ashutosh Chauhan, Gopal Vijayaraghavan, Lefty Leverenz, Vaibhav Gumashta)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/fb230f9d
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/fb230f9d
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/fb230f9d

Branch: refs/heads/master
Commit: fb230f9df5b7c990c80326671d9975a6f05e1600
Parents: 145e253
Author: Vaibhav Gumashta <vg...@hortonworks.com>
Authored: Fri Apr 22 12:22:55 2016 -0700
Committer: Vaibhav Gumashta <vg...@hortonworks.com>
Committed: Fri Apr 22 12:22:55 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/hive/beeline/Commands.java  |   2 +-
 .../org/apache/hadoop/hive/conf/HiveConf.java   |  11 +
 .../apache/hive/jdbc/TestJdbcWithMiniHS2.java   | 108 ++++-
 .../org/apache/hive/jdbc/HiveBaseResultSet.java |   2 +-
 .../apache/hive/jdbc/HiveResultSetMetaData.java |   2 +-
 .../org/apache/hive/jdbc/HiveStatement.java     |  16 +-
 .../java/org/apache/hive/jdbc/JdbcColumn.java   |   2 +-
 ql/pom.xml                                      |   6 +
 .../java/org/apache/hadoop/hive/ql/Driver.java  |  19 +
 .../hive/ql/exec/DefaultFetchFormatter.java     |  77 ----
 .../hadoop/hive/ql/exec/FetchFormatter.java     |  71 ---
 .../apache/hadoop/hive/ql/exec/FetchTask.java   |   3 +-
 .../hadoop/hive/ql/exec/FileSinkOperator.java   |  27 +-
 .../hadoop/hive/ql/exec/ListSinkOperator.java   |  11 +-
 .../org/apache/hadoop/hive/ql/parse/QB.java     |   5 +
 .../hadoop/hive/ql/parse/SemanticAnalyzer.java  |  22 +-
 .../hadoop/hive/ql/parse/TaskCompiler.java      |  62 ++-
 .../apache/hadoop/hive/ql/plan/FetchWork.java   |  10 +
 .../hadoop/hive/ql/plan/FileSinkDesc.java       |   9 +
 .../apache/hadoop/hive/ql/plan/PlanUtils.java   |  10 +-
 serde/pom.xml                                   |   5 +
 .../hive/serde2/DefaultFetchFormatter.java      |  73 +++
 .../hadoop/hive/serde2/FetchFormatter.java      |  37 ++
 .../hadoop/hive/serde2/NoOpFetchFormatter.java  |  48 ++
 .../apache/hadoop/hive/serde2/SerDeUtils.java   |   3 +-
 .../hadoop/hive/serde2/thrift/ColumnBuffer.java | 439 +++++++++++++++++++
 .../hive/serde2/thrift/ThriftFormatter.java     |  40 ++
 .../serde2/thrift/ThriftJDBCBinarySerDe.java    | 178 ++++++++
 .../apache/hadoop/hive/serde2/thrift/Type.java  | 438 ++++++++++++++++++
 .../hadoop/hive/serde2/typeinfo/TypeInfo.java   |  14 +-
 service-rpc/if/TCLIService.thrift               |   2 +
 .../gen/thrift/gen-cpp/TCLIService_types.cpp    |  44 ++
 .../src/gen/thrift/gen-cpp/TCLIService_types.h  |  20 +-
 .../apache/hive/service/rpc/thrift/TRowSet.java | 222 +++++++++-
 service-rpc/src/gen/thrift/gen-php/Types.php    |  46 ++
 .../src/gen/thrift/gen-py/TCLIService/ttypes.py |  28 +-
 .../gen/thrift/gen-rb/t_c_l_i_service_types.rb  |   6 +-
 .../org/apache/hive/service/cli/Column.java     | 434 ------------------
 .../apache/hive/service/cli/ColumnBasedSet.java |  84 +++-
 .../hive/service/cli/ColumnDescriptor.java      |  12 +-
 .../apache/hive/service/cli/ColumnValue.java    |   1 +
 .../apache/hive/service/cli/RowSetFactory.java  |  17 +-
 .../apache/hive/service/cli/TableSchema.java    |   4 +-
 .../java/org/apache/hive/service/cli/Type.java  | 348 ---------------
 .../apache/hive/service/cli/TypeDescriptor.java |   1 +
 .../cli/operation/GetCatalogsOperation.java     |   2 +-
 .../cli/operation/GetColumnsOperation.java      |   4 +-
 .../cli/operation/GetFunctionsOperation.java    |   8 +-
 .../cli/operation/GetSchemasOperation.java      |   5 +-
 .../cli/operation/GetTableTypesOperation.java   |   9 +-
 .../cli/operation/GetTablesOperation.java       |   2 +-
 .../cli/operation/GetTypeInfoOperation.java     |   4 +-
 .../cli/operation/HiveCommandOperation.java     |   4 +-
 .../service/cli/operation/OperationManager.java |  12 +-
 .../service/cli/operation/SQLOperation.java     |  20 +-
 .../service/cli/session/HiveSessionImpl.java    |   8 +-
 .../service/cli/thrift/ThriftCLIService.java    |   2 +-
 .../apache/hive/service/cli/CLIServiceTest.java |   2 +-
 .../org/apache/hive/service/cli/TestColumn.java |  14 +-
 59 files changed, 2066 insertions(+), 1049 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/beeline/src/java/org/apache/hive/beeline/Commands.java
----------------------------------------------------------------------
diff --git a/beeline/src/java/org/apache/hive/beeline/Commands.java b/beeline/src/java/org/apache/hive/beeline/Commands.java
index 22e2066..0178333 100644
--- a/beeline/src/java/org/apache/hive/beeline/Commands.java
+++ b/beeline/src/java/org/apache/hive/beeline/Commands.java
@@ -1179,7 +1179,7 @@ public class Commands {
   private void showRemainingLogsIfAny(Statement statement) {
     if (statement instanceof HiveStatement) {
       HiveStatement hiveStatement = (HiveStatement) statement;
-      List<String> logs;
+      List<String> logs = null;
       do {
         try {
           logs = hiveStatement.getQueryLog();

http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 5cf1609..c52b9d9 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2037,6 +2037,7 @@ public class HiveConf extends Configuration {
         new TimeValidator(TimeUnit.SECONDS),
         "Number of seconds a request will wait to acquire the compile lock before giving up. " +
         "Setting it to 0s disables the timeout."),
+
     // HiveServer2 WebUI
     HIVE_SERVER2_WEBUI_BIND_HOST("hive.server2.webui.host", "0.0.0.0", "The host address the HiveServer2 WebUI will listen on"),
     HIVE_SERVER2_WEBUI_PORT("hive.server2.webui.port", 10002, "The port the HiveServer2 WebUI will listen on. This can be"
@@ -2167,6 +2168,7 @@ public class HiveConf extends Configuration {
         new TimeValidator(TimeUnit.SECONDS),
         "Keepalive time (in seconds) for an idle worker thread. When the number of workers exceeds min workers, " +
         "excessive threads are killed after this time interval."),
+
     // Configuration for async thread pool in SessionManager
     HIVE_SERVER2_ASYNC_EXEC_THREADS("hive.server2.async.exec.threads", 100,
         "Number of threads in the async thread pool for HiveServer2"),
@@ -2330,6 +2332,14 @@ public class HiveConf extends Configuration {
     HIVE_SERVER2_THRIFT_CLIENT_PASSWORD("hive.server2.thrift.client.password", "anonymous","Password to use against " +
       "thrift client"),
 
+    // ResultSet serialization settings
+    HIVE_SERVER2_THRIFT_RESULTSET_SERIALIZE_IN_TASKS("hive.server2.thrift.resultset.serialize.in.tasks", false,
+      "Whether we should serialize the Thrift structures used in JDBC ResultSet RPC in task nodes.\n " +
+      "We use SequenceFile and ThriftJDBCBinarySerDe to read and write the final results if this is true."),
+    // TODO: Make use of this config to configure fetch size
+    HIVE_SERVER2_THRIFT_RESULTSET_MAX_FETCH_SIZE("hive.server2.thrift.resultset.max.fetch.size", 1000,
+      "Max number of rows sent in one Fetch RPC call by the server to the client."),
+
     HIVE_SECURITY_COMMAND_WHITELIST("hive.security.command.whitelist", "set,reset,dfs,add,list,delete,reload,compile",
         "Comma separated list of non-SQL Hive commands users are authorized to execute"),
 
@@ -3646,6 +3656,7 @@ public class HiveConf extends Configuration {
     ConfVars.HIVE_RESULTSET_USE_UNIQUE_COLUMN_NAMES.varname,
     ConfVars.HIVE_STATS_COLLECT_PART_LEVEL_STATS.varname,
     ConfVars.HIVE_SERVER2_LOGGING_OPERATION_LEVEL.varname,
+    ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_SERIALIZE_IN_TASKS.varname,
     ConfVars.HIVE_SUPPORT_SQL11_RESERVED_KEYWORDS.varname,
     ConfVars.JOB_DEBUG_CAPTURE_STACKTRACES.varname,
     ConfVars.JOB_DEBUG_TIMEOUT.varname,

http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java
index 10c8ff2..815ccfa 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java
@@ -431,6 +431,112 @@ public class TestJdbcWithMiniHS2 {
     }
   }
 
+  private void setSerializeInTasksInConf(HiveConf conf) {
+    conf.setBoolean("hive.server2.thrift.resultset.serialize.in.tasks", true);
+    conf.setInt("hive.server2.thrift.resultset.max.fetch.size", 1000);
+  }
+
+  @Test
+  public void testMetadataQueriesWithSerializeThriftInTasks() throws Exception {
+    //stop HiveServer2
+    if (miniHS2.isStarted()) {
+      miniHS2.stop();
+    }
+
+    HiveConf conf = new HiveConf();
+    String userName;
+    setSerializeInTasksInConf(conf);
+    miniHS2 = new MiniHS2(conf);
+    Map<String, String> confOverlay = new HashMap<String, String>();
+    miniHS2.start(confOverlay);
+
+    userName = System.getProperty("user.name");
+    hs2Conn = getConnection(miniHS2.getJdbcURL(), userName, "password");
+    Statement stmt = hs2Conn.createStatement();
+    stmt.execute("drop table if exists testThriftSerializeShow");
+    stmt.execute("create table testThriftSerializeShow (a int)");
+    ResultSet rs = stmt.executeQuery("show tables");
+    assertTrue(rs.next());
+    stmt.execute("describe testThriftSerializeShow");
+    stmt.execute("explain select a from testThriftSerializeShow");
+    stmt.execute("drop table testThriftSerializeShow");
+    stmt.close();
+  }
+
+  @Test
+  public void testSelectThriftSerializeInTasks() throws Exception {
+    if (miniHS2.isStarted()) {
+      miniHS2.stop();
+    }
+
+    HiveConf conf = new HiveConf();
+    String userName;
+    setSerializeInTasksInConf(conf);
+    miniHS2 = new MiniHS2(conf);
+    Map<String, String> confOverlay = new HashMap<String, String>();
+    miniHS2.start(confOverlay);
+
+    userName = System.getProperty("user.name");
+    hs2Conn = getConnection(miniHS2.getJdbcURL(), userName, "password");
+    Statement stmt = hs2Conn.createStatement();
+
+    stmt.execute("drop table if exists testSelectThriftOrders");
+    stmt.execute("drop table if exists testSelectThriftCustomers");
+    stmt.execute("create table testSelectThriftOrders (orderid int, orderdate string, customerid int)");
+    stmt.execute("create table testSelectThriftCustomers (customerid int, customername string, customercountry string)");
+    stmt.execute("insert into testSelectThriftOrders values (1, '2015-09-09', 123), (2, '2015-10-10', 246), (3, '2015-11-11', 356)");
+    stmt.execute("insert into testSelectThriftCustomers values (123, 'David', 'America'), (246, 'John', 'Canada'), (356, 'Mary', 'CostaRica')");
+    ResultSet countOrders = stmt.executeQuery("select count(*) from testSelectThriftOrders");
+    while (countOrders.next()) {
+       assertEquals(3, countOrders.getInt(1));
+    }
+    ResultSet maxOrders = stmt.executeQuery("select max(customerid) from testSelectThriftCustomers");
+    while (maxOrders.next()) {
+      assertEquals(356, maxOrders.getInt(1));
+    }
+    stmt.execute("drop table testSelectThriftOrders");
+    stmt.execute("drop table testSelectThriftCustomers");
+    stmt.close();
+  }
+
+  @Test
+  public void testJoinThriftSerializeInTasks() throws Exception {
+    //stop HiveServer2
+    if (miniHS2.isStarted()) {
+      miniHS2.stop();
+    }
+    HiveConf conf = new HiveConf();
+    String userName;
+
+    setSerializeInTasksInConf(conf);
+
+    miniHS2 = new MiniHS2(conf);
+    Map<String, String> confOverlay = new HashMap<String, String>();
+    miniHS2.start(confOverlay);
+
+    userName = System.getProperty("user.name");
+    hs2Conn = getConnection(miniHS2.getJdbcURL(), userName, "password");
+    Statement stmt = hs2Conn.createStatement();
+    stmt.execute("drop table if exists testThriftJoinOrders");
+    stmt.execute("drop table if exists testThriftJoinCustomers");
+    stmt.execute("create table testThriftJoinOrders (orderid int, orderdate string, customerid int)");
+    stmt.execute("create table testThriftJoinCustomers (customerid int, customername string, customercountry string)");
+    stmt.execute("insert into testThriftJoinOrders values (1, '2015-09-09', 123), (2, '2015-10-10', 246), (3, '2015-11-11', 356)");
+    stmt.execute("insert into testThriftJoinCustomers values (123, 'David', 'America'), (246, 'John', 'Canada'), (356, 'Mary', 'CostaRica')");
+    ResultSet joinResultSet = stmt.executeQuery("select testThriftJoinOrders.orderid, testThriftJoinCustomers.customername from testThriftJoinOrders inner join testThriftJoinCustomers where testThriftJoinOrders.customerid=testThriftJoinCustomers.customerid");
+    Map<Integer, String> expectedResult = new HashMap<Integer, String>();
+    expectedResult.put(1, "David");
+    expectedResult.put(2, "John");
+    expectedResult.put(3, "Mary");
+    for (int i = 1; i < 4; i++) {
+      assertTrue(joinResultSet.next());
+      assertEquals(joinResultSet.getString(2), expectedResult.get(i));
+    }
+    stmt.execute("drop table testThriftJoinOrders");
+    stmt.execute("drop table testThriftJoinCustomers");
+    stmt.close();
+  }
+
   /**
    * Tests the creation of the 3 scratch dirs: hdfs, local, downloaded resources (which is also local).
    * 1. Test with doAs=false: open a new JDBC session and verify the presence of directories/permissions
@@ -810,4 +916,4 @@ public class TestJdbcWithMiniHS2 {
     }
     return -1;
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/jdbc/src/java/org/apache/hive/jdbc/HiveBaseResultSet.java
----------------------------------------------------------------------
diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveBaseResultSet.java b/jdbc/src/java/org/apache/hive/jdbc/HiveBaseResultSet.java
index 88ba853..93f093f 100644
--- a/jdbc/src/java/org/apache/hive/jdbc/HiveBaseResultSet.java
+++ b/jdbc/src/java/org/apache/hive/jdbc/HiveBaseResultSet.java
@@ -46,8 +46,8 @@ import java.util.Map;
 
 import org.apache.hadoop.hive.common.type.HiveIntervalDayTime;
 import org.apache.hadoop.hive.common.type.HiveIntervalYearMonth;
+import org.apache.hadoop.hive.serde2.thrift.Type;
 import org.apache.hive.service.cli.TableSchema;
-import org.apache.hive.service.cli.Type;
 
 /**
  * Data independent base class which implements the common part of

http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/jdbc/src/java/org/apache/hive/jdbc/HiveResultSetMetaData.java
----------------------------------------------------------------------
diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveResultSetMetaData.java b/jdbc/src/java/org/apache/hive/jdbc/HiveResultSetMetaData.java
index 16a0894..f6c38d8 100644
--- a/jdbc/src/java/org/apache/hive/jdbc/HiveResultSetMetaData.java
+++ b/jdbc/src/java/org/apache/hive/jdbc/HiveResultSetMetaData.java
@@ -22,7 +22,7 @@ import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.util.List;
 
-import org.apache.hive.service.cli.Type;
+import org.apache.hadoop.hive.serde2.thrift.Type;
 
 /**
  * HiveResultSetMetaData.

http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
----------------------------------------------------------------------
diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
index f5b9672..3cc6b74 100644
--- a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
+++ b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
@@ -35,6 +35,7 @@ import org.apache.hive.service.rpc.thrift.TGetOperationStatusReq;
 import org.apache.hive.service.rpc.thrift.TGetOperationStatusResp;
 import org.apache.hive.service.rpc.thrift.TOperationHandle;
 import org.apache.hive.service.rpc.thrift.TSessionHandle;
+import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -881,15 +882,22 @@ public class HiveStatement implements java.sql.Statement {
       }
     } catch (SQLException e) {
       throw e;
+    } catch (TException e) {
+      throw new SQLException("Error when getting query log: " + e, e);
     } catch (Exception e) {
       throw new SQLException("Error when getting query log: " + e, e);
     }
 
-    RowSet rowSet = RowSetFactory.create(tFetchResultsResp.getResults(),
-        connection.getProtocol());
-    for (Object[] row : rowSet) {
-      logs.add(String.valueOf(row[0]));
+    try {
+      RowSet rowSet;
+      rowSet = RowSetFactory.create(tFetchResultsResp.getResults(), connection.getProtocol());
+      for (Object[] row : rowSet) {
+        logs.add(String.valueOf(row[0]));
+      }
+    } catch (TException e) {
+      throw new SQLException("Error building result set for query log: " + e, e);
     }
+
     return logs;
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/jdbc/src/java/org/apache/hive/jdbc/JdbcColumn.java
----------------------------------------------------------------------
diff --git a/jdbc/src/java/org/apache/hive/jdbc/JdbcColumn.java b/jdbc/src/java/org/apache/hive/jdbc/JdbcColumn.java
index 691fd0e..5aed679 100644
--- a/jdbc/src/java/org/apache/hive/jdbc/JdbcColumn.java
+++ b/jdbc/src/java/org/apache/hive/jdbc/JdbcColumn.java
@@ -27,7 +27,7 @@ import java.sql.Types;
 import org.apache.hadoop.hive.common.type.HiveIntervalDayTime;
 import org.apache.hadoop.hive.common.type.HiveIntervalYearMonth;
 import org.apache.hadoop.hive.serde.serdeConstants;
-import org.apache.hive.service.cli.Type;
+import org.apache.hadoop.hive.serde2.thrift.Type;
 
 
 /**

http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/ql/pom.xml
----------------------------------------------------------------------
diff --git a/ql/pom.xml b/ql/pom.xml
index ebb9599..aaa3271 100644
--- a/ql/pom.xml
+++ b/ql/pom.xml
@@ -70,6 +70,11 @@
     </dependency>
     <dependency>
       <groupId>org.apache.hive</groupId>
+      <artifactId>hive-service-rpc</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hive</groupId>
       <artifactId>hive-llap-client</artifactId>
       <version>${project.version}</version>
     </dependency>
@@ -803,6 +808,7 @@
                   <include>org.apache.hive:hive-serde</include>
                   <include>org.apache.hive:hive-llap-client</include>
                   <include>org.apache.hive:hive-metastore</include>
+                  <include>org.apache.hive:hive-service-rpc</include>
                   <include>com.esotericsoftware:kryo-shaded</include>
 		  <include>com.esotericsoftware:minlog</include>
 		  <include>org.objenesis:objenesis</include>

http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 65744ac..48fb060 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -108,6 +108,7 @@ import org.apache.hadoop.hive.ql.session.OperationLog.LoggingLevel;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
 import org.apache.hadoop.hive.serde2.ByteStream;
+import org.apache.hadoop.hive.serde2.thrift.ThriftJDBCBinarySerDe;
 import org.apache.hadoop.hive.shims.Utils;
 import org.apache.hadoop.mapred.ClusterStatus;
 import org.apache.hadoop.mapred.JobClient;
@@ -932,6 +933,13 @@ public class Driver implements CommandProcessor {
     return plan;
   }
 
+  /**
+   * @return The current FetchTask associated with the Driver's plan, if any.
+   */
+  public FetchTask getFetchTask() {
+    return fetchTask;
+  }
+
   // Write the current set of valid transactions into the conf file so that it can be read by
   // the input format.
   private void recordValidTxns() throws LockException {
@@ -1880,6 +1888,17 @@ public class Driver implements CommandProcessor {
       throw new IOException("FAILED: Operation cancelled");
     }
     if (isFetchingTable()) {
+      /**
+       * If resultset serialization to thrift object is enabled, and if the destination table is
+       * indeed written using ThriftJDBCBinarySerDe, read one row from the output sequence file,
+       * since it is a blob of row batches.
+       */
+      if (fetchTask.getWork().isHiveServerQuery() && HiveConf.getBoolVar(conf,
+          HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_SERIALIZE_IN_TASKS)
+          && (fetchTask.getTblDesc().getSerdeClassName()
+              .equalsIgnoreCase(ThriftJDBCBinarySerDe.class.getName()))) {
+        maxRows = 1;
+      }
       fetchTask.setMaxRows(maxRows);
       return fetchTask.fetch(res);
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/ql/src/java/org/apache/hadoop/hive/ql/exec/DefaultFetchFormatter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DefaultFetchFormatter.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DefaultFetchFormatter.java
deleted file mode 100644
index b8be3a5..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DefaultFetchFormatter.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.exec;
-
-import java.io.IOException;
-import java.util.Properties;
-
-import static org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_FORMAT;
-import static org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_NULL_FORMAT;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.common.JavaUtils;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.serde2.DelimitedJSONSerDe;
-import org.apache.hadoop.hive.serde2.SerDe;
-import org.apache.hadoop.hive.serde2.SerDeUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hive.common.util.ReflectionUtil;
-
-/**
- * serialize row by user specified serde and call toString() to make string type result
- */
-public class DefaultFetchFormatter<T> implements FetchFormatter<String> {
-
-  private SerDe mSerde;
-
-  @Override
-  public void initialize(Configuration hconf, Properties props) throws HiveException {
-    try {
-      mSerde = initializeSerde(hconf, props);
-    } catch (Exception e) {
-      throw new HiveException(e);
-    }
-  }
-
-  private SerDe initializeSerde(Configuration conf, Properties props) throws Exception {
-    String serdeName = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEFETCHOUTPUTSERDE);
-    Class<? extends SerDe> serdeClass = Class.forName(serdeName, true,
-        Utilities.getSessionSpecifiedClassLoader()).asSubclass(SerDe.class);
-    // cast only needed for Hadoop 0.17 compatibility
-    SerDe serde = ReflectionUtil.newInstance(serdeClass, null);
-
-    Properties serdeProps = new Properties();
-    if (serde instanceof DelimitedJSONSerDe) {
-      serdeProps.put(SERIALIZATION_FORMAT, props.getProperty(SERIALIZATION_FORMAT));
-      serdeProps.put(SERIALIZATION_NULL_FORMAT, props.getProperty(SERIALIZATION_NULL_FORMAT));
-    }
-    SerDeUtils.initializeSerDe(serde, conf, serdeProps, null);
-    return serde;
-  }
-
-  @Override
-  public String convert(Object row, ObjectInspector rowOI) throws Exception {
-    return mSerde.serialize(row, rowOI).toString();
-  }
-
-  @Override
-  public void close() throws IOException {
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchFormatter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchFormatter.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchFormatter.java
deleted file mode 100644
index c2ed0d6..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchFormatter.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.exec;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.List;
-import java.util.Properties;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.serde2.SerDeUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-
-/**
- * internal-use only
- *
- * Used in ListSinkOperator for formatting final output
- */
-public interface FetchFormatter<T> extends Closeable {
-
-  void initialize(Configuration hconf, Properties props) throws Exception;
-
-  T convert(Object row, ObjectInspector rowOI) throws Exception;
-
-  public static class ThriftFormatter implements FetchFormatter<Object> {
-
-    int protocol;
-
-    @Override
-    public void initialize(Configuration hconf, Properties props) throws Exception {
-      protocol = hconf.getInt(ListSinkOperator.OUTPUT_PROTOCOL, 0);
-    }
-
-    @Override
-    public Object convert(Object row, ObjectInspector rowOI) throws Exception {
-      StructObjectInspector structOI = (StructObjectInspector) rowOI;
-      List<? extends StructField> fields = structOI.getAllStructFieldRefs();
-
-      Object[] converted = new Object[fields.size()];
-      for (int i = 0 ; i < converted.length; i++) {
-        StructField fieldRef = fields.get(i);
-        Object field = structOI.getStructFieldData(row, fieldRef);
-        converted[i] = field == null ? null :
-            SerDeUtils.toThriftPayload(field, fieldRef.getFieldObjectInspector(), protocol);
-      }
-      return converted;
-    }
-
-    @Override
-    public void close() throws IOException {
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
index 0b0c336..b96ea04 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
@@ -48,12 +48,10 @@ import org.apache.hadoop.util.StringUtils;
  **/
 public class FetchTask extends Task<FetchWork> implements Serializable {
   private static final long serialVersionUID = 1L;
-
   private int maxRows = 100;
   private FetchOperator fetch;
   private ListSinkOperator sink;
   private int totalRows;
-
   private static transient final Logger LOG = LoggerFactory.getLogger(FetchTask.class);
 
   public FetchTask() {
@@ -186,4 +184,5 @@ public class FetchTask extends Task<FetchWork> implements Serializable {
       fetch.clearFetchContext();
     }
   }
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index ec6381b..3ec63ee 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -71,6 +71,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.SubStructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
+import org.apache.hadoop.hive.serde2.thrift.ThriftJDBCBinarySerDe;
 import org.apache.hadoop.hive.shims.HadoopShims.StoragePolicyShim;
 import org.apache.hadoop.hive.shims.HadoopShims.StoragePolicyValue;
 import org.apache.hadoop.hive.shims.ShimLoader;
@@ -122,7 +123,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
   protected transient long numRows = 0;
   protected transient long cntr = 1;
   protected transient long logEveryNRows = 0;
-
+  protected transient int rowIndex = 0;
   /**
    * Counters.
    */
@@ -374,7 +375,6 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
       // half of the script.timeout but less than script.timeout, we will still
       // be able to report progress.
       timeOut = hconf.getInt("mapred.healthChecker.script.timeout", 600000) / 2;
-
       if (hconf instanceof JobConf) {
         jc = (JobConf) hconf;
       } else {
@@ -656,6 +656,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
 
   protected Writable recordValue;
 
+
   @Override
   public void process(Object row, int tag) throws HiveException {
     /* Create list bucketing sub-directory only if stored-as-directories is on. */
@@ -717,8 +718,12 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
         } else {
           fpaths = fsp;
         }
-        // use SerDe to serialize r, and write it out
         recordValue = serializer.serialize(row, inputObjInspectors[0]);
+        // if serializer is ThriftJDBCBinarySerDe, then recordValue is null if the buffer is not full (the size of buffer
+        // is kept track of in the SerDe)
+        if (recordValue == null) {
+          return;
+        }
       }
 
       rowOutWriters = fpaths.outWriters;
@@ -745,6 +750,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
         LOG.info(toString() + ": records written - " + numRows);
       }
 
+      // This should always be 0 for the final result file
       int writerOffset = findWriterOffset(row);
       // This if/else chain looks ugly in the inner loop, but given that it will be 100% the same
       // for a given operator branch prediction should work quite nicely on it.
@@ -1012,9 +1018,22 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
 
     lastProgressReport = System.currentTimeMillis();
     if (!abort) {
+      // If serializer is ThriftJDBCBinarySerDe, then it buffers rows to a certain limit (hive.server2.thrift.resultset.max.fetch.size)
+      // and serializes the whole batch when the buffer is full. The serialize returns null if the buffer is not full
+      // (the size of buffer is kept track of in the ThriftJDBCBinarySerDe).
+      if (conf.isHiveServerQuery() && HiveConf.getBoolVar(hconf,
+          HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_SERIALIZE_IN_TASKS) &&
+          serializer.getClass().getName().equalsIgnoreCase(ThriftJDBCBinarySerDe.class.getName())) {
+          try {
+            recordValue = serializer.serialize(null, inputObjInspectors[0]);
+            rowOutWriters = fpaths.outWriters;
+            rowOutWriters[0].write(recordValue);
+          } catch (SerDeException | IOException e) {
+            throw new HiveException(e);
+          }
+      }
       for (FSPaths fsp : valToPaths.values()) {
         fsp.closeWriters(abort);
-
         // before closing the operator check if statistics gathering is requested
         // and is provided by record writer. this is different from the statistics
         // gathering done in processOp(). In processOp(), for each row added

http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/ql/src/java/org/apache/hadoop/hive/ql/exec/ListSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ListSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ListSinkOperator.java
index b081cd0..9bf363c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ListSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ListSinkOperator.java
@@ -27,6 +27,9 @@ import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.ListSinkDesc;
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
 import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.DefaultFetchFormatter;
+import org.apache.hadoop.hive.serde2.FetchFormatter;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
 import org.apache.hadoop.util.ReflectionUtils;
 
 /**
@@ -34,10 +37,6 @@ import org.apache.hadoop.util.ReflectionUtils;
  * and finally arrives to this operator.
  */
 public class ListSinkOperator extends Operator<ListSinkDesc> {
-
-  public static final String OUTPUT_FORMATTER = "output.formatter";
-  public static final String OUTPUT_PROTOCOL = "output.protocol";
-
   private transient List res;
   private transient FetchFormatter fetcher;
   private transient int numRows;
@@ -62,7 +61,7 @@ public class ListSinkOperator extends Operator<ListSinkDesc> {
   }
 
   private FetchFormatter initializeFetcher(Configuration conf) throws Exception {
-    String formatterName = conf.get(OUTPUT_FORMATTER);
+    String formatterName = conf.get(SerDeUtils.LIST_SINK_OUTPUT_FORMATTER);
     FetchFormatter fetcher;
     if (formatterName != null && !formatterName.isEmpty()) {
       Class<? extends FetchFormatter> fetcherClass = Class.forName(formatterName, true,
@@ -71,12 +70,10 @@ public class ListSinkOperator extends Operator<ListSinkDesc> {
     } else {
       fetcher = new DefaultFetchFormatter();
     }
-
     // selectively used by fetch formatter
     Properties props = new Properties();
     props.put(serdeConstants.SERIALIZATION_FORMAT, "" + Utilities.tabCode);
     props.put(serdeConstants.SERIALIZATION_NULL_FORMAT, getConf().getSerializationNullFormat());
-
     fetcher.initialize(conf, props);
     return fetcher;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java
index cf3bbf0..de7b151 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java
@@ -263,6 +263,11 @@ public class QB {
     this.isQuery = isQuery;
   }
 
+  /**
+   * Set to true in SemanticAnalyzer.getMetadataForDestFile,
+   * if destination is a file and query is not CTAS
+   * @return
+   */
   public boolean getIsQuery() {
     return isQuery;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index 96df189..005b53f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -203,6 +203,7 @@ import org.apache.hadoop.hive.ql.util.ResourceDownloader;
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.Deserializer;
 import org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe;
+import org.apache.hadoop.hive.serde2.NoOpFetchFormatter;
 import org.apache.hadoop.hive.serde2.NullStructSerDe;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.SerDeUtils;
@@ -214,6 +215,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
 import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.thrift.ThriftJDBCBinarySerDe;
 import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
@@ -6834,8 +6836,23 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
 
       if (tblDesc == null) {
         if (qb.getIsQuery()) {
-          String fileFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT);
-          table_desc = PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, fileFormat);
+          String fileFormat;
+          if (SessionState.get().isHiveServerQuery() &&
+                   conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_SERIALIZE_IN_TASKS)) {
+              fileFormat = "SequenceFile";
+              HiveConf.setVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT, fileFormat);
+              table_desc=
+                         PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, fileFormat,
+                           ThriftJDBCBinarySerDe.class);
+              // Set the fetch formatter to be a no-op for the ListSinkOperator, since we'll
+              // write out formatted thrift objects to SequenceFile
+              conf.set(SerDeUtils.LIST_SINK_OUTPUT_FORMATTER, NoOpFetchFormatter.class.getName());
+          } else {
+              fileFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT);
+              table_desc =
+                         PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, fileFormat,
+                           LazySimpleSerDe.class);
+          }
         } else {
           table_desc = PlanUtils.getDefaultTableDesc(qb.getDirectoryDesc(), cols, colTypes);
         }
@@ -6907,6 +6924,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
       dpCtx,
       dest_path);
 
+    fileSinkDesc.setHiveServerQuery(SessionState.get().isHiveServerQuery());
     // If this is an insert, update, or delete on an ACID table then mark that so the
     // FileSinkOperator knows how to properly write to it.
     if (destTableIsAcid) {

http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
index f7d7a40..75ca5f9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
@@ -58,7 +58,15 @@ import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
 import org.apache.hadoop.hive.ql.plan.MoveWork;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.DefaultFetchFormatter;
+import org.apache.hadoop.hive.serde2.NoOpFetchFormatter;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
+import org.apache.hadoop.hive.serde2.thrift.ThriftFormatter;
+import org.apache.hadoop.hive.serde2.thrift.ThriftJDBCBinarySerDe;
 
 import com.google.common.collect.Interner;
 import com.google.common.collect.Interners;
@@ -97,6 +105,20 @@ public abstract class TaskCompiler {
     int outerQueryLimit = pCtx.getQueryProperties().getOuterQueryLimit();
 
     if (pCtx.getFetchTask() != null) {
+      if (pCtx.getFetchTask().getTblDesc() == null) {
+        return;
+      }
+      pCtx.getFetchTask().getWork().setHiveServerQuery(SessionState.get().isHiveServerQuery());
+      TableDesc resultTab = pCtx.getFetchTask().getTblDesc();
+      // If the serializer is ThriftJDBCBinarySerDe, then it requires that NoOpFetchFormatter be used. But when it isn't,
+      // then either the ThriftFormatter or the DefaultFetchFormatter should be used.
+      if (!resultTab.getSerdeClassName().equalsIgnoreCase(ThriftJDBCBinarySerDe.class.getName())) {
+        if (SessionState.get().isHiveServerQuery()) {
+          conf.set(SerDeUtils.LIST_SINK_OUTPUT_FORMATTER,ThriftFormatter.class.getName());
+        } else {
+          conf.set(SerDeUtils.LIST_SINK_OUTPUT_FORMATTER, DefaultFetchFormatter.class.getName());
+        }
+      }
       return;
     }
 
@@ -117,13 +139,34 @@ public abstract class TaskCompiler {
       String cols = loadFileDesc.getColumns();
       String colTypes = loadFileDesc.getColumnTypes();
 
+      String resFileFormat;
       TableDesc resultTab = pCtx.getFetchTableDesc();
       if (resultTab == null) {
-        String resFileFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT);
-        resultTab = PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, resFileFormat);
+        resFileFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT);
+        if (SessionState.get().isHiveServerQuery() && (conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_SERIALIZE_IN_TASKS))
+            && (resFileFormat.equalsIgnoreCase("SequenceFile"))) {
+          resultTab =
+              PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, resFileFormat,
+                  ThriftJDBCBinarySerDe.class);
+          // Set the fetch formatter to be a no-op for the ListSinkOperator, since we'll
+          // read formatted thrift objects from the output SequenceFile written by Tasks.
+          conf.set(SerDeUtils.LIST_SINK_OUTPUT_FORMATTER, NoOpFetchFormatter.class.getName());
+        } else {
+          resultTab =
+              PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, resFileFormat,
+                  LazySimpleSerDe.class);
+        }
+      } else {
+        if (resultTab.getProperties().getProperty(serdeConstants.SERIALIZATION_LIB)
+            .equalsIgnoreCase(ThriftJDBCBinarySerDe.class.getName())) {
+          // Set the fetch formatter to be a no-op for the ListSinkOperator, since we'll
+          // read formatted thrift objects from the output SequenceFile written by Tasks.
+          conf.set(SerDeUtils.LIST_SINK_OUTPUT_FORMATTER, NoOpFetchFormatter.class.getName());
+        }
       }
 
       FetchWork fetch = new FetchWork(loadFileDesc.getSourcePath(), resultTab, outerQueryLimit);
+      fetch.setHiveServerQuery(SessionState.get().isHiveServerQuery());
       fetch.setSource(pCtx.getFetchSource());
       fetch.setSink(pCtx.getFetchSink());
 
@@ -322,8 +365,19 @@ public abstract class TaskCompiler {
     String cols = loadFileWork.get(0).getColumns();
     String colTypes = loadFileWork.get(0).getColumnTypes();
 
-    String resFileFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT);
-    TableDesc resultTab = PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, resFileFormat);
+    String resFileFormat;
+    TableDesc resultTab;
+    if (SessionState.get().isHiveServerQuery() && conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_SERIALIZE_IN_TASKS)) {
+      resFileFormat = "SequenceFile";
+      resultTab =
+          PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, resFileFormat,
+              ThriftJDBCBinarySerDe.class);
+    } else {
+      resFileFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT);
+      resultTab =
+          PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, resFileFormat,
+              LazySimpleSerDe.class);
+    }
 
     fetch = new FetchWork(loadFileWork.get(0).getSourcePath(), resultTab, outerQueryLimit);
 

http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java
index d68c64c..8ea6440 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java
@@ -63,6 +63,16 @@ public class FetchWork implements Serializable {
    */
   private String serializationNullFormat = "NULL";
 
+  private boolean isHiveServerQuery;
+
+  public boolean isHiveServerQuery() {
+	return isHiveServerQuery;
+  }
+
+  public void setHiveServerQuery(boolean isHiveServerQuery) {
+	this.isHiveServerQuery = isHiveServerQuery;
+  }
+
   public FetchWork() {
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
index 07fd2dc..0064fca 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
@@ -97,6 +97,7 @@ public class FileSinkDesc extends AbstractOperatorDesc {
 
   private transient Table table;
   private Path destPath;
+  private boolean isHiveServerQuery;
 
   public FileSinkDesc() {
   }
@@ -160,6 +161,14 @@ public class FileSinkDesc extends AbstractOperatorDesc {
     return ret;
   }
 
+  public boolean isHiveServerQuery() {
+	  return this.isHiveServerQuery;
+  }
+
+  public void setHiveServerQuery(boolean isHiveServerQuery) {
+	  this.isHiveServerQuery = isHiveServerQuery;
+  }
+
   @Explain(displayName = "directory", explainLevels = { Level.EXTENDED })
   public Path getDirName() {
     return dirName;

http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
index 2992568..c39a46f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
@@ -280,13 +280,13 @@ public final class PlanUtils {
   }
 
   public static TableDesc getDefaultQueryOutputTableDesc(String cols, String colTypes,
-      String fileFormat) {
-    TableDesc tblDesc = getTableDesc(LazySimpleSerDe.class, "" + Utilities.ctrlaCode, cols, colTypes,
-        false, false, fileFormat);
-    //enable escaping
+      String fileFormat, Class<? extends Deserializer> serdeClass) {
+    TableDesc tblDesc =
+        getTableDesc(serdeClass, "" + Utilities.ctrlaCode, cols, colTypes, false, false, fileFormat);
+    // enable escaping
     tblDesc.getProperties().setProperty(serdeConstants.ESCAPE_CHAR, "\\");
     tblDesc.getProperties().setProperty(serdeConstants.SERIALIZATION_ESCAPE_CRLF, "true");
-    //enable extended nesting levels
+    // enable extended nesting levels
     tblDesc.getProperties().setProperty(
         LazySerDeParameters.SERIALIZATION_EXTEND_ADDITIONAL_NESTING_LEVELS, "true");
     return tblDesc;

http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/serde/pom.xml
----------------------------------------------------------------------
diff --git a/serde/pom.xml b/serde/pom.xml
index cea7fce..9f50764 100644
--- a/serde/pom.xml
+++ b/serde/pom.xml
@@ -41,6 +41,11 @@
     </dependency>
     <dependency>
       <groupId>org.apache.hive</groupId>
+      <artifactId>hive-service-rpc</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hive</groupId>
       <artifactId>hive-shims</artifactId>
       <version>${project.version}</version>
     </dependency>

http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/serde/src/java/org/apache/hadoop/hive/serde2/DefaultFetchFormatter.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/DefaultFetchFormatter.java b/serde/src/java/org/apache/hadoop/hive/serde2/DefaultFetchFormatter.java
new file mode 100644
index 0000000..3038037
--- /dev/null
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/DefaultFetchFormatter.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.serde2;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import static org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_FORMAT;
+import static org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_NULL_FORMAT;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hive.common.util.ReflectionUtil;
+
+/**
+ * serialize row by user specified serde and call toString() to make string type result
+ */
+public class DefaultFetchFormatter<T> implements FetchFormatter<String> {
+
+  private SerDe mSerde;
+
+  @Override
+  public void initialize(Configuration hconf, Properties props) throws SerDeException {
+    mSerde = initializeSerde(hconf, props);
+  }
+
+  private SerDe initializeSerde(Configuration conf, Properties props) throws SerDeException {
+    String serdeName = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEFETCHOUTPUTSERDE);
+    Class<? extends SerDe> serdeClass;
+    try {
+      serdeClass =
+          Class.forName(serdeName, true, JavaUtils.getClassLoader()).asSubclass(SerDe.class);
+    } catch (ClassNotFoundException e) {
+      throw new SerDeException(e);
+    }
+    // cast only needed for Hadoop 0.17 compatibility
+    SerDe serde = ReflectionUtil.newInstance(serdeClass, null);
+    Properties serdeProps = new Properties();
+    if (serde instanceof DelimitedJSONSerDe) {
+      serdeProps.put(SERIALIZATION_FORMAT, props.getProperty(SERIALIZATION_FORMAT));
+      serdeProps.put(SERIALIZATION_NULL_FORMAT, props.getProperty(SERIALIZATION_NULL_FORMAT));
+    }
+    SerDeUtils.initializeSerDe(serde, conf, serdeProps, null);
+    return serde;
+  }
+
+  @Override
+  public String convert(Object row, ObjectInspector rowOI) throws Exception {
+    return mSerde.serialize(row, rowOI).toString();
+  }
+
+  @Override
+  public void close() throws IOException {
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/serde/src/java/org/apache/hadoop/hive/serde2/FetchFormatter.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/FetchFormatter.java b/serde/src/java/org/apache/hadoop/hive/serde2/FetchFormatter.java
new file mode 100644
index 0000000..5cc93aa
--- /dev/null
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/FetchFormatter.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.serde2;
+
+import java.io.Closeable;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+
+/**
+ * internal-use only
+ *
+ * Used in ListSinkOperator for formatting final output
+ */
+public interface FetchFormatter<T> extends Closeable {
+
+  void initialize(Configuration hconf, Properties props) throws Exception;
+
+  T convert(Object row, ObjectInspector rowOI) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/serde/src/java/org/apache/hadoop/hive/serde2/NoOpFetchFormatter.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/NoOpFetchFormatter.java b/serde/src/java/org/apache/hadoop/hive/serde2/NoOpFetchFormatter.java
new file mode 100644
index 0000000..91929f1
--- /dev/null
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/NoOpFetchFormatter.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.serde2;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+
+/**
+ * A No-op fetch formatter.
+ * ListSinkOperator uses this when reading from the destination table which has data serialized by
+ * ThriftJDBCBinarySerDe to a SequenceFile.
+ */
+public class NoOpFetchFormatter<T> implements FetchFormatter<Object> {
+
+  @Override
+  public void initialize(Configuration hconf, Properties props) throws SerDeException {
+  }
+
+  // this returns the row as is because this formatter is only called when
+  // the ThriftJDBCBinarySerDe was used to serialize the rows to thrift-able objects.
+  @Override
+  public Object convert(Object row, ObjectInspector rowOI) throws Exception {
+    return new Object[] { row };
+  }
+
+  @Override
+  public void close() throws IOException {
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/serde/src/java/org/apache/hadoop/hive/serde2/SerDeUtils.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/SerDeUtils.java b/serde/src/java/org/apache/hadoop/hive/serde2/SerDeUtils.java
index 90439a2..6e08dfd 100644
--- a/serde/src/java/org/apache/hadoop/hive/serde2/SerDeUtils.java
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/SerDeUtils.java
@@ -71,7 +71,8 @@ public final class SerDeUtils {
 
   // lower case null is used within json objects
   private static final String JSON_NULL = "null";
-
+  public static final String LIST_SINK_OUTPUT_FORMATTER = "list.sink.output.formatter";
+  public static final String LIST_SINK_OUTPUT_PROTOCOL = "list.sink.output.protocol";
   public static final Logger LOG = LoggerFactory.getLogger(SerDeUtils.class.getName());
 
   /**

http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ColumnBuffer.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ColumnBuffer.java b/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ColumnBuffer.java
new file mode 100644
index 0000000..929c405
--- /dev/null
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ColumnBuffer.java
@@ -0,0 +1,439 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.serde2.thrift;
+
+import java.nio.ByteBuffer;
+import java.util.AbstractList;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.hive.service.rpc.thrift.TBinaryColumn;
+import org.apache.hive.service.rpc.thrift.TBoolColumn;
+import org.apache.hive.service.rpc.thrift.TByteColumn;
+import org.apache.hive.service.rpc.thrift.TColumn;
+import org.apache.hive.service.rpc.thrift.TDoubleColumn;
+import org.apache.hive.service.rpc.thrift.TI16Column;
+import org.apache.hive.service.rpc.thrift.TI32Column;
+import org.apache.hive.service.rpc.thrift.TI64Column;
+import org.apache.hive.service.rpc.thrift.TStringColumn;
+
+import com.google.common.primitives.Booleans;
+import com.google.common.primitives.Bytes;
+import com.google.common.primitives.Doubles;
+import com.google.common.primitives.Ints;
+import com.google.common.primitives.Longs;
+import com.google.common.primitives.Shorts;
+
+/**
+ * ColumnBuffer
+ */
+public class ColumnBuffer extends AbstractList {
+
+  private static final int DEFAULT_SIZE = 100;
+
+  private final Type type;
+
+  private BitSet nulls;
+
+  private int size;
+  private boolean[] boolVars;
+  private byte[] byteVars;
+  private short[] shortVars;
+  private int[] intVars;
+  private long[] longVars;
+  private double[] doubleVars;
+  private List<String> stringVars;
+  private List<ByteBuffer> binaryVars;
+
+  public ColumnBuffer(Type type, BitSet nulls, Object values) {
+    this.type = type;
+    this.nulls = nulls;
+    if (type == Type.BOOLEAN_TYPE) {
+      boolVars = (boolean[]) values;
+      size = boolVars.length;
+    } else if (type == Type.TINYINT_TYPE) {
+      byteVars = (byte[]) values;
+      size = byteVars.length;
+    } else if (type == Type.SMALLINT_TYPE) {
+      shortVars = (short[]) values;
+      size = shortVars.length;
+    } else if (type == Type.INT_TYPE) {
+      intVars = (int[]) values;
+      size = intVars.length;
+    } else if (type == Type.BIGINT_TYPE) {
+      longVars = (long[]) values;
+      size = longVars.length;
+    } else if (type == Type.DOUBLE_TYPE) {
+      doubleVars = (double[]) values;
+      size = doubleVars.length;
+    } else if (type == Type.BINARY_TYPE) {
+      binaryVars = (List<ByteBuffer>) values;
+      size = binaryVars.size();
+    } else if (type == Type.STRING_TYPE) {
+      stringVars = (List<String>) values;
+      size = stringVars.size();
+    } else {
+      throw new IllegalStateException("invalid union object");
+    }
+  }
+
+  public ColumnBuffer(Type type) {
+    nulls = new BitSet();
+    switch (type) {
+      case BOOLEAN_TYPE:
+        boolVars = new boolean[DEFAULT_SIZE];
+        break;
+      case TINYINT_TYPE:
+        byteVars = new byte[DEFAULT_SIZE];
+        break;
+      case SMALLINT_TYPE:
+        shortVars = new short[DEFAULT_SIZE];
+        break;
+      case INT_TYPE:
+        intVars = new int[DEFAULT_SIZE];
+        break;
+      case BIGINT_TYPE:
+        longVars = new long[DEFAULT_SIZE];
+        break;
+      case FLOAT_TYPE:
+      case DOUBLE_TYPE:
+        type = Type.DOUBLE_TYPE;
+        doubleVars = new double[DEFAULT_SIZE];
+        break;
+      case BINARY_TYPE:
+        binaryVars = new ArrayList<ByteBuffer>();
+        break;
+      default:
+        type = Type.STRING_TYPE;
+        stringVars = new ArrayList<String>();
+    }
+    this.type = type;
+  }
+
+  public ColumnBuffer(TColumn colValues) {
+    if (colValues.isSetBoolVal()) {
+      type = Type.BOOLEAN_TYPE;
+      nulls = toBitset(colValues.getBoolVal().getNulls());
+      boolVars = Booleans.toArray(colValues.getBoolVal().getValues());
+      size = boolVars.length;
+    } else if (colValues.isSetByteVal()) {
+      type = Type.TINYINT_TYPE;
+      nulls = toBitset(colValues.getByteVal().getNulls());
+      byteVars = Bytes.toArray(colValues.getByteVal().getValues());
+      size = byteVars.length;
+    } else if (colValues.isSetI16Val()) {
+      type = Type.SMALLINT_TYPE;
+      nulls = toBitset(colValues.getI16Val().getNulls());
+      shortVars = Shorts.toArray(colValues.getI16Val().getValues());
+      size = shortVars.length;
+    } else if (colValues.isSetI32Val()) {
+      type = Type.INT_TYPE;
+      nulls = toBitset(colValues.getI32Val().getNulls());
+      intVars = Ints.toArray(colValues.getI32Val().getValues());
+      size = intVars.length;
+    } else if (colValues.isSetI64Val()) {
+      type = Type.BIGINT_TYPE;
+      nulls = toBitset(colValues.getI64Val().getNulls());
+      longVars = Longs.toArray(colValues.getI64Val().getValues());
+      size = longVars.length;
+    } else if (colValues.isSetDoubleVal()) {
+      type = Type.DOUBLE_TYPE;
+      nulls = toBitset(colValues.getDoubleVal().getNulls());
+      doubleVars = Doubles.toArray(colValues.getDoubleVal().getValues());
+      size = doubleVars.length;
+    } else if (colValues.isSetBinaryVal()) {
+      type = Type.BINARY_TYPE;
+      nulls = toBitset(colValues.getBinaryVal().getNulls());
+      binaryVars = colValues.getBinaryVal().getValues();
+      size = binaryVars.size();
+    } else if (colValues.isSetStringVal()) {
+      type = Type.STRING_TYPE;
+      nulls = toBitset(colValues.getStringVal().getNulls());
+      stringVars = colValues.getStringVal().getValues();
+      size = stringVars.size();
+    } else {
+      throw new IllegalStateException("invalid union object");
+    }
+  }
+
+  public ColumnBuffer extractSubset(int start, int end) {
+    BitSet subNulls = nulls.get(start, end);
+    if (type == Type.BOOLEAN_TYPE) {
+      ColumnBuffer subset =
+          new ColumnBuffer(type, subNulls, Arrays.copyOfRange(boolVars, start, end));
+      boolVars = Arrays.copyOfRange(boolVars, end, size);
+      nulls = nulls.get(start, size);
+      size = boolVars.length;
+      return subset;
+    }
+    if (type == Type.TINYINT_TYPE) {
+      ColumnBuffer subset =
+          new ColumnBuffer(type, subNulls, Arrays.copyOfRange(byteVars, start, end));
+      byteVars = Arrays.copyOfRange(byteVars, end, size);
+      nulls = nulls.get(start, size);
+      size = byteVars.length;
+      return subset;
+    }
+    if (type == Type.SMALLINT_TYPE) {
+      ColumnBuffer subset =
+          new ColumnBuffer(type, subNulls, Arrays.copyOfRange(shortVars, start, end));
+      shortVars = Arrays.copyOfRange(shortVars, end, size);
+      nulls = nulls.get(start, size);
+      size = shortVars.length;
+      return subset;
+    }
+    if (type == Type.INT_TYPE) {
+      ColumnBuffer subset =
+          new ColumnBuffer(type, subNulls, Arrays.copyOfRange(intVars, start, end));
+      intVars = Arrays.copyOfRange(intVars, end, size);
+      nulls = nulls.get(start, size);
+      size = intVars.length;
+      return subset;
+    }
+    if (type == Type.BIGINT_TYPE) {
+      ColumnBuffer subset =
+          new ColumnBuffer(type, subNulls, Arrays.copyOfRange(longVars, start, end));
+      longVars = Arrays.copyOfRange(longVars, end, size);
+      nulls = nulls.get(start, size);
+      size = longVars.length;
+      return subset;
+    }
+    if (type == Type.DOUBLE_TYPE) {
+      ColumnBuffer subset =
+          new ColumnBuffer(type, subNulls, Arrays.copyOfRange(doubleVars, start, end));
+      doubleVars = Arrays.copyOfRange(doubleVars, end, size);
+      nulls = nulls.get(start, size);
+      size = doubleVars.length;
+      return subset;
+    }
+    if (type == Type.BINARY_TYPE) {
+      ColumnBuffer subset = new ColumnBuffer(type, subNulls, binaryVars.subList(start, end));
+      binaryVars = binaryVars.subList(end, binaryVars.size());
+      nulls = nulls.get(start, size);
+      size = binaryVars.size();
+      return subset;
+    }
+    if (type == Type.STRING_TYPE) {
+      ColumnBuffer subset = new ColumnBuffer(type, subNulls, stringVars.subList(start, end));
+      stringVars = stringVars.subList(end, stringVars.size());
+      nulls = nulls.get(start, size);
+      size = stringVars.size();
+      return subset;
+    }
+    throw new IllegalStateException("invalid union object");
+  }
+
+  private static final byte[] MASKS = new byte[] {
+      0x01, 0x02, 0x04, 0x08, 0x10, 0x20, 0x40, (byte)0x80
+  };
+
+  private static BitSet toBitset(byte[] nulls) {
+    BitSet bitset = new BitSet();
+    int bits = nulls.length * 8;
+    for (int i = 0; i < bits; i++) {
+      bitset.set(i, (nulls[i / 8] & MASKS[i % 8]) != 0);
+    }
+    return bitset;
+  }
+
+  private static byte[] toBinary(BitSet bitset) {
+    byte[] nulls = new byte[1 + (bitset.length() / 8)];
+    for (int i = 0; i < bitset.length(); i++) {
+      nulls[i / 8] |= bitset.get(i) ? MASKS[i % 8] : 0;
+    }
+    return nulls;
+  }
+
+  public Type getType() {
+    return type;
+  }
+
+  @Override
+  public Object get(int index) {
+    if (nulls.get(index)) {
+      return null;
+    }
+    switch (type) {
+      case BOOLEAN_TYPE:
+        return boolVars[index];
+      case TINYINT_TYPE:
+        return byteVars[index];
+      case SMALLINT_TYPE:
+        return shortVars[index];
+      case INT_TYPE:
+        return intVars[index];
+      case BIGINT_TYPE:
+        return longVars[index];
+      case DOUBLE_TYPE:
+        return doubleVars[index];
+      case STRING_TYPE:
+        return stringVars.get(index);
+      case BINARY_TYPE:
+        return binaryVars.get(index).array();
+    }
+    return null;
+  }
+
+  @Override
+  public int size() {
+    return size;
+  }
+
+  public TColumn toTColumn() {
+    TColumn value = new TColumn();
+    ByteBuffer nullMasks = ByteBuffer.wrap(toBinary(nulls));
+    switch (type) {
+    case BOOLEAN_TYPE:
+      value.setBoolVal(new TBoolColumn(Booleans.asList(Arrays.copyOfRange(boolVars, 0, size)),
+          nullMasks));
+      break;
+    case TINYINT_TYPE:
+      value.setByteVal(new TByteColumn(Bytes.asList(Arrays.copyOfRange(byteVars, 0, size)),
+          nullMasks));
+      break;
+    case SMALLINT_TYPE:
+      value.setI16Val(new TI16Column(Shorts.asList(Arrays.copyOfRange(shortVars, 0, size)),
+          nullMasks));
+      break;
+    case INT_TYPE:
+      value.setI32Val(new TI32Column(Ints.asList(Arrays.copyOfRange(intVars, 0, size)), nullMasks));
+      break;
+    case BIGINT_TYPE:
+      value
+          .setI64Val(new TI64Column(Longs.asList(Arrays.copyOfRange(longVars, 0, size)), nullMasks));
+      break;
+    case DOUBLE_TYPE:
+      value.setDoubleVal(new TDoubleColumn(Doubles.asList(Arrays.copyOfRange(doubleVars, 0, size)),
+          nullMasks));
+      break;
+    case STRING_TYPE:
+      value.setStringVal(new TStringColumn(stringVars, nullMasks));
+      break;
+    case BINARY_TYPE:
+      value.setBinaryVal(new TBinaryColumn(binaryVars, nullMasks));
+      break;
+    }
+    return value;
+  }
+
+  private static final ByteBuffer EMPTY_BINARY = ByteBuffer.allocate(0);
+  private static final String EMPTY_STRING = "";
+
+  public void addValue(Object field) {
+    addValue(this.type, field);
+  }
+
+  public void addValue(Type type, Object field) {
+    switch (type) {
+    case BOOLEAN_TYPE:
+      nulls.set(size, field == null);
+      boolVars()[size] = field == null ? true : (Boolean) field;
+      break;
+    case TINYINT_TYPE:
+      nulls.set(size, field == null);
+      byteVars()[size] = field == null ? 0 : (Byte) field;
+      break;
+    case SMALLINT_TYPE:
+      nulls.set(size, field == null);
+      shortVars()[size] = field == null ? 0 : (Short) field;
+      break;
+    case INT_TYPE:
+      nulls.set(size, field == null);
+      intVars()[size] = field == null ? 0 : (Integer) field;
+      break;
+    case BIGINT_TYPE:
+      nulls.set(size, field == null);
+      longVars()[size] = field == null ? 0 : (Long) field;
+      break;
+    case FLOAT_TYPE:
+      nulls.set(size, field == null);
+      doubleVars()[size] = field == null ? 0 : new Double(field.toString());
+      break;
+    case DOUBLE_TYPE:
+      nulls.set(size, field == null);
+      doubleVars()[size] = field == null ? 0 : (Double) field;
+      break;
+    case BINARY_TYPE:
+      nulls.set(binaryVars.size(), field == null);
+      binaryVars.add(field == null ? EMPTY_BINARY : ByteBuffer.wrap((byte[]) field));
+      break;
+    default:
+      nulls.set(stringVars.size(), field == null);
+      stringVars.add(field == null ? EMPTY_STRING : String.valueOf(field));
+      break;
+    }
+    size++;
+  }
+
+  private boolean[] boolVars() {
+    if (boolVars.length == size) {
+      boolean[] newVars = new boolean[size << 1];
+      System.arraycopy(boolVars, 0, newVars, 0, size);
+      return boolVars = newVars;
+    }
+    return boolVars;
+  }
+
+  private byte[] byteVars() {
+    if (byteVars.length == size) {
+      byte[] newVars = new byte[size << 1];
+      System.arraycopy(byteVars, 0, newVars, 0, size);
+      return byteVars = newVars;
+    }
+    return byteVars;
+  }
+
+  private short[] shortVars() {
+    if (shortVars.length == size) {
+      short[] newVars = new short[size << 1];
+      System.arraycopy(shortVars, 0, newVars, 0, size);
+      return shortVars = newVars;
+    }
+    return shortVars;
+  }
+
+  private int[] intVars() {
+    if (intVars.length == size) {
+      int[] newVars = new int[size << 1];
+      System.arraycopy(intVars, 0, newVars, 0, size);
+      return intVars = newVars;
+    }
+    return intVars;
+  }
+
+  private long[] longVars() {
+    if (longVars.length == size) {
+      long[] newVars = new long[size << 1];
+      System.arraycopy(longVars, 0, newVars, 0, size);
+      return longVars = newVars;
+    }
+    return longVars;
+  }
+
+  private double[] doubleVars() {
+    if (doubleVars.length == size) {
+      double[] newVars = new double[size << 1];
+      System.arraycopy(doubleVars, 0, newVars, 0, size);
+      return doubleVars = newVars;
+    }
+    return doubleVars;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ThriftFormatter.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ThriftFormatter.java b/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ThriftFormatter.java
new file mode 100644
index 0000000..a4c120e
--- /dev/null
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ThriftFormatter.java
@@ -0,0 +1,40 @@
+package org.apache.hadoop.hive.serde2.thrift;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.serde2.FetchFormatter;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+
+public class ThriftFormatter implements FetchFormatter<Object> {
+
+  int protocol;
+
+  @Override
+  public void initialize(Configuration hconf, Properties props) throws Exception {
+    protocol = hconf.getInt(SerDeUtils.LIST_SINK_OUTPUT_PROTOCOL, 0);
+  }
+
+  @Override
+  public Object convert(Object row, ObjectInspector rowOI) throws Exception {
+    StructObjectInspector structOI = (StructObjectInspector) rowOI;
+    List<? extends StructField> fields = structOI.getAllStructFieldRefs();
+    Object[] converted = new Object[fields.size()];
+    for (int i = 0 ; i < converted.length; i++) {
+      StructField fieldRef = fields.get(i);
+      Object field = structOI.getStructFieldData(row, fieldRef);
+      converted[i] = field == null ? null :
+          SerDeUtils.toThriftPayload(field, fieldRef.getFieldObjectInspector(), protocol);
+    }
+    return converted;
+  }
+
+  @Override
+  public void close() throws IOException {
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ThriftJDBCBinarySerDe.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ThriftJDBCBinarySerDe.java b/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ThriftJDBCBinarySerDe.java
new file mode 100644
index 0000000..5c31974
--- /dev/null
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ThriftJDBCBinarySerDe.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.serde2.thrift;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.ByteStream;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeStats;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hive.service.rpc.thrift.TColumn;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TIOStreamTransport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This SerDe is used to serialize the final output to thrift-able objects directly in the SerDe. Use this SerDe only for final output resultSets.
+ * It is used  if HIVE_SERVER2_THRIFT_RESULTSET_SERIALIZE_IN_TASKS is set to true. It buffers rows that come in from FileSink till it reaches max_buffer_size (also configurable)
+ * or all rows are finished and FileSink.closeOp() is called.
+ */
+public class ThriftJDBCBinarySerDe extends AbstractSerDe {
+  public static final Logger LOG = LoggerFactory.getLogger(ThriftJDBCBinarySerDe.class.getName());
+  private List<String> columnNames;
+  private List<TypeInfo> columnTypes;
+  private ColumnBuffer[] columnBuffers;
+  private TypeInfo rowTypeInfo;
+  private ArrayList<Object> row;
+  private BytesWritable serializedBytesWritable = new BytesWritable();
+  private ByteStream.Output output = new ByteStream.Output();
+  private TProtocol protocol = new TCompactProtocol(new TIOStreamTransport(output));
+  private ThriftFormatter thriftFormatter = new ThriftFormatter();
+  private int MAX_BUFFERED_ROWS;
+  private int count;
+  private StructObjectInspector rowObjectInspector;
+
+
+  @Override
+  public void initialize(Configuration conf, Properties tbl) throws SerDeException {
+    // Get column names
+	MAX_BUFFERED_ROWS = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_MAX_FETCH_SIZE);
+    String columnNameProperty = tbl.getProperty(serdeConstants.LIST_COLUMNS);
+    String columnTypeProperty = tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES);
+    if (columnNameProperty.length() == 0) {
+      columnNames = new ArrayList<String>();
+    } else {
+      columnNames = Arrays.asList(columnNameProperty.split(","));
+    }
+    if (columnTypeProperty.length() == 0) {
+      columnTypes = new ArrayList<TypeInfo>();
+    } else {
+      columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
+    }
+    rowTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNames, columnTypes);
+    rowObjectInspector =
+        (StructObjectInspector) TypeInfoUtils
+            .getStandardWritableObjectInspectorFromTypeInfo(rowTypeInfo);
+
+    initializeRowAndColumns();
+    try {
+      thriftFormatter.initialize(conf, tbl);
+    } catch (Exception e) {
+      new SerDeException(e);
+    }
+  }
+
+  @Override
+  public Class<? extends Writable> getSerializedClass() {
+    return BytesWritable.class;
+  }
+
+  private Writable serializeBatch() throws SerDeException {
+	  output.reset();
+	  for (int i = 0; i < columnBuffers.length; i++) {
+		  TColumn tColumn = columnBuffers[i].toTColumn();
+		  try {
+			  tColumn.write(protocol);
+		  } catch(TException e) {
+			  throw new SerDeException(e);
+		  }
+	  }
+	  initializeRowAndColumns();
+	  serializedBytesWritable.set(output.getData(), 0, output.getLength());
+	  return serializedBytesWritable;
+  }
+
+  // use the columnNames to initialize the reusable row object and the columnBuffers. reason this is being done is if buffer is full, we should reinitialize the
+  // column buffers, otherwise at the end when closeOp() is called, things get printed multiple times.
+  private void initializeRowAndColumns() {
+	    row = new ArrayList<Object>(columnNames.size());
+	    for (int i = 0; i < columnNames.size(); i++) {
+	      row.add(null);
+	    }
+	    // Initialize column buffers
+	    columnBuffers = new ColumnBuffer[columnNames.size()];
+	    for (int i = 0; i < columnBuffers.length; i++) {
+	      columnBuffers[i] = new ColumnBuffer(Type.getType(columnTypes.get(i)));
+	    }
+  }
+
+  /**
+   * Write TColumn objects to the underlying stream of TProtocol
+   */
+  @Override
+  public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException {
+    //if row is null, it means there are no more rows (closeOp()). another case can be that the buffer is full.
+    if (obj == null)
+        return serializeBatch();
+    count += 1;
+    StructObjectInspector soi = (StructObjectInspector) objInspector;
+    List<? extends StructField> fields = soi.getAllStructFieldRefs();
+    try {
+	    Object[] formattedRow = (Object[]) thriftFormatter.convert(obj, objInspector);
+	    for (int i = 0; i < columnNames.size(); i++) {
+	        columnBuffers[i].addValue(formattedRow[i]);
+	    }
+    } catch (Exception e) {
+        throw new SerDeException(e);
+    }
+    if (count == MAX_BUFFERED_ROWS) {
+        count = 0;
+        return serializeBatch();
+    }
+    return null;
+  }
+
+  @Override
+  public SerDeStats getSerDeStats() {
+    return null;
+  }
+
+  /**
+   * Return the bytes from this writable blob.
+   * Eventually the client of this method will interpret the byte using the Thrift Protocol
+   */
+  @Override
+  public Object deserialize(Writable blob) throws SerDeException {
+    return ((BytesWritable) blob).getBytes();
+  }
+
+  @Override
+  public ObjectInspector getObjectInspector() throws SerDeException {
+    return rowObjectInspector;
+  }
+
+}