You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by vg...@apache.org on 2016/04/22 21:23:52 UTC
[3/3] hive git commit: HIVE-12049: HiveServer2: Provide an option to
write serialized thrift objects in final tasks (Rohit Dholakia reviewed by
Ashutosh Chauhan, Gopal Vijayaraghavan, Lefty Leverenz, Vaibhav Gumashta)
HIVE-12049: HiveServer2: Provide an option to write serialized thrift objects in final tasks (Rohit Dholakia reviewed by Ashutosh Chauhan, Gopal Vijayaraghavan, Lefty Leverenz, Vaibhav Gumashta)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/fb230f9d
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/fb230f9d
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/fb230f9d
Branch: refs/heads/master
Commit: fb230f9df5b7c990c80326671d9975a6f05e1600
Parents: 145e253
Author: Vaibhav Gumashta <vg...@hortonworks.com>
Authored: Fri Apr 22 12:22:55 2016 -0700
Committer: Vaibhav Gumashta <vg...@hortonworks.com>
Committed: Fri Apr 22 12:22:55 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/hive/beeline/Commands.java | 2 +-
.../org/apache/hadoop/hive/conf/HiveConf.java | 11 +
.../apache/hive/jdbc/TestJdbcWithMiniHS2.java | 108 ++++-
.../org/apache/hive/jdbc/HiveBaseResultSet.java | 2 +-
.../apache/hive/jdbc/HiveResultSetMetaData.java | 2 +-
.../org/apache/hive/jdbc/HiveStatement.java | 16 +-
.../java/org/apache/hive/jdbc/JdbcColumn.java | 2 +-
ql/pom.xml | 6 +
.../java/org/apache/hadoop/hive/ql/Driver.java | 19 +
.../hive/ql/exec/DefaultFetchFormatter.java | 77 ----
.../hadoop/hive/ql/exec/FetchFormatter.java | 71 ---
.../apache/hadoop/hive/ql/exec/FetchTask.java | 3 +-
.../hadoop/hive/ql/exec/FileSinkOperator.java | 27 +-
.../hadoop/hive/ql/exec/ListSinkOperator.java | 11 +-
.../org/apache/hadoop/hive/ql/parse/QB.java | 5 +
.../hadoop/hive/ql/parse/SemanticAnalyzer.java | 22 +-
.../hadoop/hive/ql/parse/TaskCompiler.java | 62 ++-
.../apache/hadoop/hive/ql/plan/FetchWork.java | 10 +
.../hadoop/hive/ql/plan/FileSinkDesc.java | 9 +
.../apache/hadoop/hive/ql/plan/PlanUtils.java | 10 +-
serde/pom.xml | 5 +
.../hive/serde2/DefaultFetchFormatter.java | 73 +++
.../hadoop/hive/serde2/FetchFormatter.java | 37 ++
.../hadoop/hive/serde2/NoOpFetchFormatter.java | 48 ++
.../apache/hadoop/hive/serde2/SerDeUtils.java | 3 +-
.../hadoop/hive/serde2/thrift/ColumnBuffer.java | 439 +++++++++++++++++++
.../hive/serde2/thrift/ThriftFormatter.java | 40 ++
.../serde2/thrift/ThriftJDBCBinarySerDe.java | 178 ++++++++
.../apache/hadoop/hive/serde2/thrift/Type.java | 438 ++++++++++++++++++
.../hadoop/hive/serde2/typeinfo/TypeInfo.java | 14 +-
service-rpc/if/TCLIService.thrift | 2 +
.../gen/thrift/gen-cpp/TCLIService_types.cpp | 44 ++
.../src/gen/thrift/gen-cpp/TCLIService_types.h | 20 +-
.../apache/hive/service/rpc/thrift/TRowSet.java | 222 +++++++++-
service-rpc/src/gen/thrift/gen-php/Types.php | 46 ++
.../src/gen/thrift/gen-py/TCLIService/ttypes.py | 28 +-
.../gen/thrift/gen-rb/t_c_l_i_service_types.rb | 6 +-
.../org/apache/hive/service/cli/Column.java | 434 ------------------
.../apache/hive/service/cli/ColumnBasedSet.java | 84 +++-
.../hive/service/cli/ColumnDescriptor.java | 12 +-
.../apache/hive/service/cli/ColumnValue.java | 1 +
.../apache/hive/service/cli/RowSetFactory.java | 17 +-
.../apache/hive/service/cli/TableSchema.java | 4 +-
.../java/org/apache/hive/service/cli/Type.java | 348 ---------------
.../apache/hive/service/cli/TypeDescriptor.java | 1 +
.../cli/operation/GetCatalogsOperation.java | 2 +-
.../cli/operation/GetColumnsOperation.java | 4 +-
.../cli/operation/GetFunctionsOperation.java | 8 +-
.../cli/operation/GetSchemasOperation.java | 5 +-
.../cli/operation/GetTableTypesOperation.java | 9 +-
.../cli/operation/GetTablesOperation.java | 2 +-
.../cli/operation/GetTypeInfoOperation.java | 4 +-
.../cli/operation/HiveCommandOperation.java | 4 +-
.../service/cli/operation/OperationManager.java | 12 +-
.../service/cli/operation/SQLOperation.java | 20 +-
.../service/cli/session/HiveSessionImpl.java | 8 +-
.../service/cli/thrift/ThriftCLIService.java | 2 +-
.../apache/hive/service/cli/CLIServiceTest.java | 2 +-
.../org/apache/hive/service/cli/TestColumn.java | 14 +-
59 files changed, 2066 insertions(+), 1049 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/beeline/src/java/org/apache/hive/beeline/Commands.java
----------------------------------------------------------------------
diff --git a/beeline/src/java/org/apache/hive/beeline/Commands.java b/beeline/src/java/org/apache/hive/beeline/Commands.java
index 22e2066..0178333 100644
--- a/beeline/src/java/org/apache/hive/beeline/Commands.java
+++ b/beeline/src/java/org/apache/hive/beeline/Commands.java
@@ -1179,7 +1179,7 @@ public class Commands {
private void showRemainingLogsIfAny(Statement statement) {
if (statement instanceof HiveStatement) {
HiveStatement hiveStatement = (HiveStatement) statement;
- List<String> logs;
+ List<String> logs = null;
do {
try {
logs = hiveStatement.getQueryLog();
http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 5cf1609..c52b9d9 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2037,6 +2037,7 @@ public class HiveConf extends Configuration {
new TimeValidator(TimeUnit.SECONDS),
"Number of seconds a request will wait to acquire the compile lock before giving up. " +
"Setting it to 0s disables the timeout."),
+
// HiveServer2 WebUI
HIVE_SERVER2_WEBUI_BIND_HOST("hive.server2.webui.host", "0.0.0.0", "The host address the HiveServer2 WebUI will listen on"),
HIVE_SERVER2_WEBUI_PORT("hive.server2.webui.port", 10002, "The port the HiveServer2 WebUI will listen on. This can be"
@@ -2167,6 +2168,7 @@ public class HiveConf extends Configuration {
new TimeValidator(TimeUnit.SECONDS),
"Keepalive time (in seconds) for an idle worker thread. When the number of workers exceeds min workers, " +
"excessive threads are killed after this time interval."),
+
// Configuration for async thread pool in SessionManager
HIVE_SERVER2_ASYNC_EXEC_THREADS("hive.server2.async.exec.threads", 100,
"Number of threads in the async thread pool for HiveServer2"),
@@ -2330,6 +2332,14 @@ public class HiveConf extends Configuration {
HIVE_SERVER2_THRIFT_CLIENT_PASSWORD("hive.server2.thrift.client.password", "anonymous","Password to use against " +
"thrift client"),
+ // ResultSet serialization settings
+ HIVE_SERVER2_THRIFT_RESULTSET_SERIALIZE_IN_TASKS("hive.server2.thrift.resultset.serialize.in.tasks", false,
+ "Whether we should serialize the Thrift structures used in JDBC ResultSet RPC in task nodes.\n " +
+ "We use SequenceFile and ThriftJDBCBinarySerDe to read and write the final results if this is true."),
+ // TODO: Make use of this config to configure fetch size
+ HIVE_SERVER2_THRIFT_RESULTSET_MAX_FETCH_SIZE("hive.server2.thrift.resultset.max.fetch.size", 1000,
+ "Max number of rows sent in one Fetch RPC call by the server to the client."),
+
HIVE_SECURITY_COMMAND_WHITELIST("hive.security.command.whitelist", "set,reset,dfs,add,list,delete,reload,compile",
"Comma separated list of non-SQL Hive commands users are authorized to execute"),
@@ -3646,6 +3656,7 @@ public class HiveConf extends Configuration {
ConfVars.HIVE_RESULTSET_USE_UNIQUE_COLUMN_NAMES.varname,
ConfVars.HIVE_STATS_COLLECT_PART_LEVEL_STATS.varname,
ConfVars.HIVE_SERVER2_LOGGING_OPERATION_LEVEL.varname,
+ ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_SERIALIZE_IN_TASKS.varname,
ConfVars.HIVE_SUPPORT_SQL11_RESERVED_KEYWORDS.varname,
ConfVars.JOB_DEBUG_CAPTURE_STACKTRACES.varname,
ConfVars.JOB_DEBUG_TIMEOUT.varname,
http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java
index 10c8ff2..815ccfa 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java
@@ -431,6 +431,112 @@ public class TestJdbcWithMiniHS2 {
}
}
+ private void setSerializeInTasksInConf(HiveConf conf) {
+ conf.setBoolean("hive.server2.thrift.resultset.serialize.in.tasks", true);
+ conf.setInt("hive.server2.thrift.resultset.max.fetch.size", 1000);
+ }
+
+ @Test
+ public void testMetadataQueriesWithSerializeThriftInTasks() throws Exception {
+ //stop HiveServer2
+ if (miniHS2.isStarted()) {
+ miniHS2.stop();
+ }
+
+ HiveConf conf = new HiveConf();
+ String userName;
+ setSerializeInTasksInConf(conf);
+ miniHS2 = new MiniHS2(conf);
+ Map<String, String> confOverlay = new HashMap<String, String>();
+ miniHS2.start(confOverlay);
+
+ userName = System.getProperty("user.name");
+ hs2Conn = getConnection(miniHS2.getJdbcURL(), userName, "password");
+ Statement stmt = hs2Conn.createStatement();
+ stmt.execute("drop table if exists testThriftSerializeShow");
+ stmt.execute("create table testThriftSerializeShow (a int)");
+ ResultSet rs = stmt.executeQuery("show tables");
+ assertTrue(rs.next());
+ stmt.execute("describe testThriftSerializeShow");
+ stmt.execute("explain select a from testThriftSerializeShow");
+ stmt.execute("drop table testThriftSerializeShow");
+ stmt.close();
+ }
+
+ @Test
+ public void testSelectThriftSerializeInTasks() throws Exception {
+ if (miniHS2.isStarted()) {
+ miniHS2.stop();
+ }
+
+ HiveConf conf = new HiveConf();
+ String userName;
+ setSerializeInTasksInConf(conf);
+ miniHS2 = new MiniHS2(conf);
+ Map<String, String> confOverlay = new HashMap<String, String>();
+ miniHS2.start(confOverlay);
+
+ userName = System.getProperty("user.name");
+ hs2Conn = getConnection(miniHS2.getJdbcURL(), userName, "password");
+ Statement stmt = hs2Conn.createStatement();
+
+ stmt.execute("drop table if exists testSelectThriftOrders");
+ stmt.execute("drop table if exists testSelectThriftCustomers");
+ stmt.execute("create table testSelectThriftOrders (orderid int, orderdate string, customerid int)");
+ stmt.execute("create table testSelectThriftCustomers (customerid int, customername string, customercountry string)");
+ stmt.execute("insert into testSelectThriftOrders values (1, '2015-09-09', 123), (2, '2015-10-10', 246), (3, '2015-11-11', 356)");
+ stmt.execute("insert into testSelectThriftCustomers values (123, 'David', 'America'), (246, 'John', 'Canada'), (356, 'Mary', 'CostaRica')");
+ ResultSet countOrders = stmt.executeQuery("select count(*) from testSelectThriftOrders");
+ while (countOrders.next()) {
+ assertEquals(3, countOrders.getInt(1));
+ }
+ ResultSet maxOrders = stmt.executeQuery("select max(customerid) from testSelectThriftCustomers");
+ while (maxOrders.next()) {
+ assertEquals(356, maxOrders.getInt(1));
+ }
+ stmt.execute("drop table testSelectThriftOrders");
+ stmt.execute("drop table testSelectThriftCustomers");
+ stmt.close();
+ }
+
+ @Test
+ public void testJoinThriftSerializeInTasks() throws Exception {
+ //stop HiveServer2
+ if (miniHS2.isStarted()) {
+ miniHS2.stop();
+ }
+ HiveConf conf = new HiveConf();
+ String userName;
+
+ setSerializeInTasksInConf(conf);
+
+ miniHS2 = new MiniHS2(conf);
+ Map<String, String> confOverlay = new HashMap<String, String>();
+ miniHS2.start(confOverlay);
+
+ userName = System.getProperty("user.name");
+ hs2Conn = getConnection(miniHS2.getJdbcURL(), userName, "password");
+ Statement stmt = hs2Conn.createStatement();
+ stmt.execute("drop table if exists testThriftJoinOrders");
+ stmt.execute("drop table if exists testThriftJoinCustomers");
+ stmt.execute("create table testThriftJoinOrders (orderid int, orderdate string, customerid int)");
+ stmt.execute("create table testThriftJoinCustomers (customerid int, customername string, customercountry string)");
+ stmt.execute("insert into testThriftJoinOrders values (1, '2015-09-09', 123), (2, '2015-10-10', 246), (3, '2015-11-11', 356)");
+ stmt.execute("insert into testThriftJoinCustomers values (123, 'David', 'America'), (246, 'John', 'Canada'), (356, 'Mary', 'CostaRica')");
+ ResultSet joinResultSet = stmt.executeQuery("select testThriftJoinOrders.orderid, testThriftJoinCustomers.customername from testThriftJoinOrders inner join testThriftJoinCustomers where testThriftJoinOrders.customerid=testThriftJoinCustomers.customerid");
+ Map<Integer, String> expectedResult = new HashMap<Integer, String>();
+ expectedResult.put(1, "David");
+ expectedResult.put(2, "John");
+ expectedResult.put(3, "Mary");
+ for (int i = 1; i < 4; i++) {
+ assertTrue(joinResultSet.next());
+ assertEquals(joinResultSet.getString(2), expectedResult.get(i));
+ }
+ stmt.execute("drop table testThriftJoinOrders");
+ stmt.execute("drop table testThriftJoinCustomers");
+ stmt.close();
+ }
+
/**
* Tests the creation of the 3 scratch dirs: hdfs, local, downloaded resources (which is also local).
* 1. Test with doAs=false: open a new JDBC session and verify the presence of directories/permissions
@@ -810,4 +916,4 @@ public class TestJdbcWithMiniHS2 {
}
return -1;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/jdbc/src/java/org/apache/hive/jdbc/HiveBaseResultSet.java
----------------------------------------------------------------------
diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveBaseResultSet.java b/jdbc/src/java/org/apache/hive/jdbc/HiveBaseResultSet.java
index 88ba853..93f093f 100644
--- a/jdbc/src/java/org/apache/hive/jdbc/HiveBaseResultSet.java
+++ b/jdbc/src/java/org/apache/hive/jdbc/HiveBaseResultSet.java
@@ -46,8 +46,8 @@ import java.util.Map;
import org.apache.hadoop.hive.common.type.HiveIntervalDayTime;
import org.apache.hadoop.hive.common.type.HiveIntervalYearMonth;
+import org.apache.hadoop.hive.serde2.thrift.Type;
import org.apache.hive.service.cli.TableSchema;
-import org.apache.hive.service.cli.Type;
/**
* Data independent base class which implements the common part of
http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/jdbc/src/java/org/apache/hive/jdbc/HiveResultSetMetaData.java
----------------------------------------------------------------------
diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveResultSetMetaData.java b/jdbc/src/java/org/apache/hive/jdbc/HiveResultSetMetaData.java
index 16a0894..f6c38d8 100644
--- a/jdbc/src/java/org/apache/hive/jdbc/HiveResultSetMetaData.java
+++ b/jdbc/src/java/org/apache/hive/jdbc/HiveResultSetMetaData.java
@@ -22,7 +22,7 @@ import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.List;
-import org.apache.hive.service.cli.Type;
+import org.apache.hadoop.hive.serde2.thrift.Type;
/**
* HiveResultSetMetaData.
http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
----------------------------------------------------------------------
diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
index f5b9672..3cc6b74 100644
--- a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
+++ b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
@@ -35,6 +35,7 @@ import org.apache.hive.service.rpc.thrift.TGetOperationStatusReq;
import org.apache.hive.service.rpc.thrift.TGetOperationStatusResp;
import org.apache.hive.service.rpc.thrift.TOperationHandle;
import org.apache.hive.service.rpc.thrift.TSessionHandle;
+import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -881,15 +882,22 @@ public class HiveStatement implements java.sql.Statement {
}
} catch (SQLException e) {
throw e;
+ } catch (TException e) {
+ throw new SQLException("Error when getting query log: " + e, e);
} catch (Exception e) {
throw new SQLException("Error when getting query log: " + e, e);
}
- RowSet rowSet = RowSetFactory.create(tFetchResultsResp.getResults(),
- connection.getProtocol());
- for (Object[] row : rowSet) {
- logs.add(String.valueOf(row[0]));
+ try {
+ RowSet rowSet;
+ rowSet = RowSetFactory.create(tFetchResultsResp.getResults(), connection.getProtocol());
+ for (Object[] row : rowSet) {
+ logs.add(String.valueOf(row[0]));
+ }
+ } catch (TException e) {
+ throw new SQLException("Error building result set for query log: " + e, e);
}
+
return logs;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/jdbc/src/java/org/apache/hive/jdbc/JdbcColumn.java
----------------------------------------------------------------------
diff --git a/jdbc/src/java/org/apache/hive/jdbc/JdbcColumn.java b/jdbc/src/java/org/apache/hive/jdbc/JdbcColumn.java
index 691fd0e..5aed679 100644
--- a/jdbc/src/java/org/apache/hive/jdbc/JdbcColumn.java
+++ b/jdbc/src/java/org/apache/hive/jdbc/JdbcColumn.java
@@ -27,7 +27,7 @@ import java.sql.Types;
import org.apache.hadoop.hive.common.type.HiveIntervalDayTime;
import org.apache.hadoop.hive.common.type.HiveIntervalYearMonth;
import org.apache.hadoop.hive.serde.serdeConstants;
-import org.apache.hive.service.cli.Type;
+import org.apache.hadoop.hive.serde2.thrift.Type;
/**
http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/ql/pom.xml
----------------------------------------------------------------------
diff --git a/ql/pom.xml b/ql/pom.xml
index ebb9599..aaa3271 100644
--- a/ql/pom.xml
+++ b/ql/pom.xml
@@ -70,6 +70,11 @@
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
+ <artifactId>hive-service-rpc</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hive</groupId>
<artifactId>hive-llap-client</artifactId>
<version>${project.version}</version>
</dependency>
@@ -803,6 +808,7 @@
<include>org.apache.hive:hive-serde</include>
<include>org.apache.hive:hive-llap-client</include>
<include>org.apache.hive:hive-metastore</include>
+ <include>org.apache.hive:hive-service-rpc</include>
<include>com.esotericsoftware:kryo-shaded</include>
<include>com.esotericsoftware:minlog</include>
<include>org.objenesis:objenesis</include>
http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 65744ac..48fb060 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -108,6 +108,7 @@ import org.apache.hadoop.hive.ql.session.OperationLog.LoggingLevel;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
import org.apache.hadoop.hive.serde2.ByteStream;
+import org.apache.hadoop.hive.serde2.thrift.ThriftJDBCBinarySerDe;
import org.apache.hadoop.hive.shims.Utils;
import org.apache.hadoop.mapred.ClusterStatus;
import org.apache.hadoop.mapred.JobClient;
@@ -932,6 +933,13 @@ public class Driver implements CommandProcessor {
return plan;
}
+ /**
+ * @return The current FetchTask associated with the Driver's plan, if any.
+ */
+ public FetchTask getFetchTask() {
+ return fetchTask;
+ }
+
// Write the current set of valid transactions into the conf file so that it can be read by
// the input format.
private void recordValidTxns() throws LockException {
@@ -1880,6 +1888,17 @@ public class Driver implements CommandProcessor {
throw new IOException("FAILED: Operation cancelled");
}
if (isFetchingTable()) {
+ /**
+ * If resultset serialization to thrift object is enabled, and if the destination table is
+ * indeed written using ThriftJDBCBinarySerDe, read one row from the output sequence file,
+ * since it is a blob of row batches.
+ */
+ if (fetchTask.getWork().isHiveServerQuery() && HiveConf.getBoolVar(conf,
+ HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_SERIALIZE_IN_TASKS)
+ && (fetchTask.getTblDesc().getSerdeClassName()
+ .equalsIgnoreCase(ThriftJDBCBinarySerDe.class.getName()))) {
+ maxRows = 1;
+ }
fetchTask.setMaxRows(maxRows);
return fetchTask.fetch(res);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/ql/src/java/org/apache/hadoop/hive/ql/exec/DefaultFetchFormatter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DefaultFetchFormatter.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DefaultFetchFormatter.java
deleted file mode 100644
index b8be3a5..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DefaultFetchFormatter.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.exec;
-
-import java.io.IOException;
-import java.util.Properties;
-
-import static org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_FORMAT;
-import static org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_NULL_FORMAT;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.common.JavaUtils;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.serde2.DelimitedJSONSerDe;
-import org.apache.hadoop.hive.serde2.SerDe;
-import org.apache.hadoop.hive.serde2.SerDeUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hive.common.util.ReflectionUtil;
-
-/**
- * serialize row by user specified serde and call toString() to make string type result
- */
-public class DefaultFetchFormatter<T> implements FetchFormatter<String> {
-
- private SerDe mSerde;
-
- @Override
- public void initialize(Configuration hconf, Properties props) throws HiveException {
- try {
- mSerde = initializeSerde(hconf, props);
- } catch (Exception e) {
- throw new HiveException(e);
- }
- }
-
- private SerDe initializeSerde(Configuration conf, Properties props) throws Exception {
- String serdeName = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEFETCHOUTPUTSERDE);
- Class<? extends SerDe> serdeClass = Class.forName(serdeName, true,
- Utilities.getSessionSpecifiedClassLoader()).asSubclass(SerDe.class);
- // cast only needed for Hadoop 0.17 compatibility
- SerDe serde = ReflectionUtil.newInstance(serdeClass, null);
-
- Properties serdeProps = new Properties();
- if (serde instanceof DelimitedJSONSerDe) {
- serdeProps.put(SERIALIZATION_FORMAT, props.getProperty(SERIALIZATION_FORMAT));
- serdeProps.put(SERIALIZATION_NULL_FORMAT, props.getProperty(SERIALIZATION_NULL_FORMAT));
- }
- SerDeUtils.initializeSerDe(serde, conf, serdeProps, null);
- return serde;
- }
-
- @Override
- public String convert(Object row, ObjectInspector rowOI) throws Exception {
- return mSerde.serialize(row, rowOI).toString();
- }
-
- @Override
- public void close() throws IOException {
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchFormatter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchFormatter.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchFormatter.java
deleted file mode 100644
index c2ed0d6..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchFormatter.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.exec;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.List;
-import java.util.Properties;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.serde2.SerDeUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-
-/**
- * internal-use only
- *
- * Used in ListSinkOperator for formatting final output
- */
-public interface FetchFormatter<T> extends Closeable {
-
- void initialize(Configuration hconf, Properties props) throws Exception;
-
- T convert(Object row, ObjectInspector rowOI) throws Exception;
-
- public static class ThriftFormatter implements FetchFormatter<Object> {
-
- int protocol;
-
- @Override
- public void initialize(Configuration hconf, Properties props) throws Exception {
- protocol = hconf.getInt(ListSinkOperator.OUTPUT_PROTOCOL, 0);
- }
-
- @Override
- public Object convert(Object row, ObjectInspector rowOI) throws Exception {
- StructObjectInspector structOI = (StructObjectInspector) rowOI;
- List<? extends StructField> fields = structOI.getAllStructFieldRefs();
-
- Object[] converted = new Object[fields.size()];
- for (int i = 0 ; i < converted.length; i++) {
- StructField fieldRef = fields.get(i);
- Object field = structOI.getStructFieldData(row, fieldRef);
- converted[i] = field == null ? null :
- SerDeUtils.toThriftPayload(field, fieldRef.getFieldObjectInspector(), protocol);
- }
- return converted;
- }
-
- @Override
- public void close() throws IOException {
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
index 0b0c336..b96ea04 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
@@ -48,12 +48,10 @@ import org.apache.hadoop.util.StringUtils;
**/
public class FetchTask extends Task<FetchWork> implements Serializable {
private static final long serialVersionUID = 1L;
-
private int maxRows = 100;
private FetchOperator fetch;
private ListSinkOperator sink;
private int totalRows;
-
private static transient final Logger LOG = LoggerFactory.getLogger(FetchTask.class);
public FetchTask() {
@@ -186,4 +184,5 @@ public class FetchTask extends Task<FetchWork> implements Serializable {
fetch.clearFetchContext();
}
}
+
}
http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index ec6381b..3ec63ee 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -71,6 +71,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.SubStructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
+import org.apache.hadoop.hive.serde2.thrift.ThriftJDBCBinarySerDe;
import org.apache.hadoop.hive.shims.HadoopShims.StoragePolicyShim;
import org.apache.hadoop.hive.shims.HadoopShims.StoragePolicyValue;
import org.apache.hadoop.hive.shims.ShimLoader;
@@ -122,7 +123,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
protected transient long numRows = 0;
protected transient long cntr = 1;
protected transient long logEveryNRows = 0;
-
+ protected transient int rowIndex = 0;
/**
* Counters.
*/
@@ -374,7 +375,6 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
// half of the script.timeout but less than script.timeout, we will still
// be able to report progress.
timeOut = hconf.getInt("mapred.healthChecker.script.timeout", 600000) / 2;
-
if (hconf instanceof JobConf) {
jc = (JobConf) hconf;
} else {
@@ -656,6 +656,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
protected Writable recordValue;
+
@Override
public void process(Object row, int tag) throws HiveException {
/* Create list bucketing sub-directory only if stored-as-directories is on. */
@@ -717,8 +718,12 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
} else {
fpaths = fsp;
}
- // use SerDe to serialize r, and write it out
recordValue = serializer.serialize(row, inputObjInspectors[0]);
+ // if serializer is ThriftJDBCBinarySerDe, then recordValue is null if the buffer is not full (the size of buffer
+ // is kept track of in the SerDe)
+ if (recordValue == null) {
+ return;
+ }
}
rowOutWriters = fpaths.outWriters;
@@ -745,6 +750,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
LOG.info(toString() + ": records written - " + numRows);
}
+ // This should always be 0 for the final result file
int writerOffset = findWriterOffset(row);
// This if/else chain looks ugly in the inner loop, but given that it will be 100% the same
// for a given operator branch prediction should work quite nicely on it.
@@ -1012,9 +1018,22 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
lastProgressReport = System.currentTimeMillis();
if (!abort) {
+ // If serializer is ThriftJDBCBinarySerDe, then it buffers rows to a certain limit (hive.server2.thrift.resultset.max.fetch.size)
+ // and serializes the whole batch when the buffer is full. The serialize returns null if the buffer is not full
+ // (the size of buffer is kept track of in the ThriftJDBCBinarySerDe).
+ if (conf.isHiveServerQuery() && HiveConf.getBoolVar(hconf,
+ HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_SERIALIZE_IN_TASKS) &&
+ serializer.getClass().getName().equalsIgnoreCase(ThriftJDBCBinarySerDe.class.getName())) {
+ try {
+ recordValue = serializer.serialize(null, inputObjInspectors[0]);
+ rowOutWriters = fpaths.outWriters;
+ rowOutWriters[0].write(recordValue);
+ } catch (SerDeException | IOException e) {
+ throw new HiveException(e);
+ }
+ }
for (FSPaths fsp : valToPaths.values()) {
fsp.closeWriters(abort);
-
// before closing the operator check if statistics gathering is requested
// and is provided by record writer. this is different from the statistics
// gathering done in processOp(). In processOp(), for each row added
http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/ql/src/java/org/apache/hadoop/hive/ql/exec/ListSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ListSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ListSinkOperator.java
index b081cd0..9bf363c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ListSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ListSinkOperator.java
@@ -27,6 +27,9 @@ import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.ListSinkDesc;
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.DefaultFetchFormatter;
+import org.apache.hadoop.hive.serde2.FetchFormatter;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
import org.apache.hadoop.util.ReflectionUtils;
/**
@@ -34,10 +37,6 @@ import org.apache.hadoop.util.ReflectionUtils;
* and finally arrives to this operator.
*/
public class ListSinkOperator extends Operator<ListSinkDesc> {
-
- public static final String OUTPUT_FORMATTER = "output.formatter";
- public static final String OUTPUT_PROTOCOL = "output.protocol";
-
private transient List res;
private transient FetchFormatter fetcher;
private transient int numRows;
@@ -62,7 +61,7 @@ public class ListSinkOperator extends Operator<ListSinkDesc> {
}
private FetchFormatter initializeFetcher(Configuration conf) throws Exception {
- String formatterName = conf.get(OUTPUT_FORMATTER);
+ String formatterName = conf.get(SerDeUtils.LIST_SINK_OUTPUT_FORMATTER);
FetchFormatter fetcher;
if (formatterName != null && !formatterName.isEmpty()) {
Class<? extends FetchFormatter> fetcherClass = Class.forName(formatterName, true,
@@ -71,12 +70,10 @@ public class ListSinkOperator extends Operator<ListSinkDesc> {
} else {
fetcher = new DefaultFetchFormatter();
}
-
// selectively used by fetch formatter
Properties props = new Properties();
props.put(serdeConstants.SERIALIZATION_FORMAT, "" + Utilities.tabCode);
props.put(serdeConstants.SERIALIZATION_NULL_FORMAT, getConf().getSerializationNullFormat());
-
fetcher.initialize(conf, props);
return fetcher;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java
index cf3bbf0..de7b151 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java
@@ -263,6 +263,11 @@ public class QB {
this.isQuery = isQuery;
}
+ /**
+ * Set to true in SemanticAnalyzer.getMetadataForDestFile,
+ * if destination is a file and query is not CTAS
+ * @return
+ */
public boolean getIsQuery() {
return isQuery;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index 96df189..005b53f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -203,6 +203,7 @@ import org.apache.hadoop.hive.ql.util.ResourceDownloader;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe;
+import org.apache.hadoop.hive.serde2.NoOpFetchFormatter;
import org.apache.hadoop.hive.serde2.NullStructSerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeUtils;
@@ -214,6 +215,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.thrift.ThriftJDBCBinarySerDe;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
@@ -6834,8 +6836,23 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
if (tblDesc == null) {
if (qb.getIsQuery()) {
- String fileFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT);
- table_desc = PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, fileFormat);
+ String fileFormat;
+ if (SessionState.get().isHiveServerQuery() &&
+ conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_SERIALIZE_IN_TASKS)) {
+ fileFormat = "SequenceFile";
+ HiveConf.setVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT, fileFormat);
+ table_desc=
+ PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, fileFormat,
+ ThriftJDBCBinarySerDe.class);
+ // Set the fetch formatter to be a no-op for the ListSinkOperator, since we'll
+ // write out formatted thrift objects to SequenceFile
+ conf.set(SerDeUtils.LIST_SINK_OUTPUT_FORMATTER, NoOpFetchFormatter.class.getName());
+ } else {
+ fileFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT);
+ table_desc =
+ PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, fileFormat,
+ LazySimpleSerDe.class);
+ }
} else {
table_desc = PlanUtils.getDefaultTableDesc(qb.getDirectoryDesc(), cols, colTypes);
}
@@ -6907,6 +6924,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
dpCtx,
dest_path);
+ fileSinkDesc.setHiveServerQuery(SessionState.get().isHiveServerQuery());
// If this is an insert, update, or delete on an ACID table then mark that so the
// FileSinkOperator knows how to properly write to it.
if (destTableIsAcid) {
http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
index f7d7a40..75ca5f9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
@@ -58,7 +58,15 @@ import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
import org.apache.hadoop.hive.ql.plan.MoveWork;
import org.apache.hadoop.hive.ql.plan.PlanUtils;
import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.DefaultFetchFormatter;
+import org.apache.hadoop.hive.serde2.NoOpFetchFormatter;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
+import org.apache.hadoop.hive.serde2.thrift.ThriftFormatter;
+import org.apache.hadoop.hive.serde2.thrift.ThriftJDBCBinarySerDe;
import com.google.common.collect.Interner;
import com.google.common.collect.Interners;
@@ -97,6 +105,20 @@ public abstract class TaskCompiler {
int outerQueryLimit = pCtx.getQueryProperties().getOuterQueryLimit();
if (pCtx.getFetchTask() != null) {
+ if (pCtx.getFetchTask().getTblDesc() == null) {
+ return;
+ }
+ pCtx.getFetchTask().getWork().setHiveServerQuery(SessionState.get().isHiveServerQuery());
+ TableDesc resultTab = pCtx.getFetchTask().getTblDesc();
+ // If the serializer is ThriftJDBCBinarySerDe, then it requires that NoOpFetchFormatter be used. But when it isn't,
+ // then either the ThriftFormatter or the DefaultFetchFormatter should be used.
+ if (!resultTab.getSerdeClassName().equalsIgnoreCase(ThriftJDBCBinarySerDe.class.getName())) {
+ if (SessionState.get().isHiveServerQuery()) {
+ conf.set(SerDeUtils.LIST_SINK_OUTPUT_FORMATTER,ThriftFormatter.class.getName());
+ } else {
+ conf.set(SerDeUtils.LIST_SINK_OUTPUT_FORMATTER, DefaultFetchFormatter.class.getName());
+ }
+ }
return;
}
@@ -117,13 +139,34 @@ public abstract class TaskCompiler {
String cols = loadFileDesc.getColumns();
String colTypes = loadFileDesc.getColumnTypes();
+ String resFileFormat;
TableDesc resultTab = pCtx.getFetchTableDesc();
if (resultTab == null) {
- String resFileFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT);
- resultTab = PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, resFileFormat);
+ resFileFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT);
+ if (SessionState.get().isHiveServerQuery() && (conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_SERIALIZE_IN_TASKS))
+ && (resFileFormat.equalsIgnoreCase("SequenceFile"))) {
+ resultTab =
+ PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, resFileFormat,
+ ThriftJDBCBinarySerDe.class);
+ // Set the fetch formatter to be a no-op for the ListSinkOperator, since we'll
+ // read formatted thrift objects from the output SequenceFile written by Tasks.
+ conf.set(SerDeUtils.LIST_SINK_OUTPUT_FORMATTER, NoOpFetchFormatter.class.getName());
+ } else {
+ resultTab =
+ PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, resFileFormat,
+ LazySimpleSerDe.class);
+ }
+ } else {
+ if (resultTab.getProperties().getProperty(serdeConstants.SERIALIZATION_LIB)
+ .equalsIgnoreCase(ThriftJDBCBinarySerDe.class.getName())) {
+ // Set the fetch formatter to be a no-op for the ListSinkOperator, since we'll
+ // read formatted thrift objects from the output SequenceFile written by Tasks.
+ conf.set(SerDeUtils.LIST_SINK_OUTPUT_FORMATTER, NoOpFetchFormatter.class.getName());
+ }
}
FetchWork fetch = new FetchWork(loadFileDesc.getSourcePath(), resultTab, outerQueryLimit);
+ fetch.setHiveServerQuery(SessionState.get().isHiveServerQuery());
fetch.setSource(pCtx.getFetchSource());
fetch.setSink(pCtx.getFetchSink());
@@ -322,8 +365,19 @@ public abstract class TaskCompiler {
String cols = loadFileWork.get(0).getColumns();
String colTypes = loadFileWork.get(0).getColumnTypes();
- String resFileFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT);
- TableDesc resultTab = PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, resFileFormat);
+ String resFileFormat;
+ TableDesc resultTab;
+ if (SessionState.get().isHiveServerQuery() && conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_SERIALIZE_IN_TASKS)) {
+ resFileFormat = "SequenceFile";
+ resultTab =
+ PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, resFileFormat,
+ ThriftJDBCBinarySerDe.class);
+ } else {
+ resFileFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT);
+ resultTab =
+ PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, resFileFormat,
+ LazySimpleSerDe.class);
+ }
fetch = new FetchWork(loadFileWork.get(0).getSourcePath(), resultTab, outerQueryLimit);
http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java
index d68c64c..8ea6440 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java
@@ -63,6 +63,16 @@ public class FetchWork implements Serializable {
*/
private String serializationNullFormat = "NULL";
+ private boolean isHiveServerQuery;
+
+ public boolean isHiveServerQuery() {
+ return isHiveServerQuery;
+ }
+
+ public void setHiveServerQuery(boolean isHiveServerQuery) {
+ this.isHiveServerQuery = isHiveServerQuery;
+ }
+
public FetchWork() {
}
http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
index 07fd2dc..0064fca 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
@@ -97,6 +97,7 @@ public class FileSinkDesc extends AbstractOperatorDesc {
private transient Table table;
private Path destPath;
+ private boolean isHiveServerQuery;
public FileSinkDesc() {
}
@@ -160,6 +161,14 @@ public class FileSinkDesc extends AbstractOperatorDesc {
return ret;
}
+ public boolean isHiveServerQuery() {
+ return this.isHiveServerQuery;
+ }
+
+ public void setHiveServerQuery(boolean isHiveServerQuery) {
+ this.isHiveServerQuery = isHiveServerQuery;
+ }
+
@Explain(displayName = "directory", explainLevels = { Level.EXTENDED })
public Path getDirName() {
return dirName;
http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
index 2992568..c39a46f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
@@ -280,13 +280,13 @@ public final class PlanUtils {
}
public static TableDesc getDefaultQueryOutputTableDesc(String cols, String colTypes,
- String fileFormat) {
- TableDesc tblDesc = getTableDesc(LazySimpleSerDe.class, "" + Utilities.ctrlaCode, cols, colTypes,
- false, false, fileFormat);
- //enable escaping
+ String fileFormat, Class<? extends Deserializer> serdeClass) {
+ TableDesc tblDesc =
+ getTableDesc(serdeClass, "" + Utilities.ctrlaCode, cols, colTypes, false, false, fileFormat);
+ // enable escaping
tblDesc.getProperties().setProperty(serdeConstants.ESCAPE_CHAR, "\\");
tblDesc.getProperties().setProperty(serdeConstants.SERIALIZATION_ESCAPE_CRLF, "true");
- //enable extended nesting levels
+ // enable extended nesting levels
tblDesc.getProperties().setProperty(
LazySerDeParameters.SERIALIZATION_EXTEND_ADDITIONAL_NESTING_LEVELS, "true");
return tblDesc;
http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/serde/pom.xml
----------------------------------------------------------------------
diff --git a/serde/pom.xml b/serde/pom.xml
index cea7fce..9f50764 100644
--- a/serde/pom.xml
+++ b/serde/pom.xml
@@ -41,6 +41,11 @@
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
+ <artifactId>hive-service-rpc</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hive</groupId>
<artifactId>hive-shims</artifactId>
<version>${project.version}</version>
</dependency>
http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/serde/src/java/org/apache/hadoop/hive/serde2/DefaultFetchFormatter.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/DefaultFetchFormatter.java b/serde/src/java/org/apache/hadoop/hive/serde2/DefaultFetchFormatter.java
new file mode 100644
index 0000000..3038037
--- /dev/null
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/DefaultFetchFormatter.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.serde2;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import static org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_FORMAT;
+import static org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_NULL_FORMAT;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hive.common.util.ReflectionUtil;
+
+/**
+ * serialize row by user specified serde and call toString() to make string type result
+ */
+public class DefaultFetchFormatter<T> implements FetchFormatter<String> {
+
+ private SerDe mSerde;
+
+ @Override
+ public void initialize(Configuration hconf, Properties props) throws SerDeException {
+ mSerde = initializeSerde(hconf, props);
+ }
+
+ private SerDe initializeSerde(Configuration conf, Properties props) throws SerDeException {
+ String serdeName = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEFETCHOUTPUTSERDE);
+ Class<? extends SerDe> serdeClass;
+ try {
+ serdeClass =
+ Class.forName(serdeName, true, JavaUtils.getClassLoader()).asSubclass(SerDe.class);
+ } catch (ClassNotFoundException e) {
+ throw new SerDeException(e);
+ }
+ // cast only needed for Hadoop 0.17 compatibility
+ SerDe serde = ReflectionUtil.newInstance(serdeClass, null);
+ Properties serdeProps = new Properties();
+ if (serde instanceof DelimitedJSONSerDe) {
+ serdeProps.put(SERIALIZATION_FORMAT, props.getProperty(SERIALIZATION_FORMAT));
+ serdeProps.put(SERIALIZATION_NULL_FORMAT, props.getProperty(SERIALIZATION_NULL_FORMAT));
+ }
+ SerDeUtils.initializeSerDe(serde, conf, serdeProps, null);
+ return serde;
+ }
+
+ @Override
+ public String convert(Object row, ObjectInspector rowOI) throws Exception {
+ return mSerde.serialize(row, rowOI).toString();
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/serde/src/java/org/apache/hadoop/hive/serde2/FetchFormatter.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/FetchFormatter.java b/serde/src/java/org/apache/hadoop/hive/serde2/FetchFormatter.java
new file mode 100644
index 0000000..5cc93aa
--- /dev/null
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/FetchFormatter.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.serde2;
+
+import java.io.Closeable;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+
+/**
+ * internal-use only
+ *
+ * Used in ListSinkOperator for formatting final output
+ */
+public interface FetchFormatter<T> extends Closeable {
+
+ void initialize(Configuration hconf, Properties props) throws Exception;
+
+ T convert(Object row, ObjectInspector rowOI) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/serde/src/java/org/apache/hadoop/hive/serde2/NoOpFetchFormatter.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/NoOpFetchFormatter.java b/serde/src/java/org/apache/hadoop/hive/serde2/NoOpFetchFormatter.java
new file mode 100644
index 0000000..91929f1
--- /dev/null
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/NoOpFetchFormatter.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.serde2;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+
+/**
+ * A No-op fetch formatter.
+ * ListSinkOperator uses this when reading from the destination table which has data serialized by
+ * ThriftJDBCBinarySerDe to a SequenceFile.
+ */
+public class NoOpFetchFormatter<T> implements FetchFormatter<Object> {
+
+ @Override
+ public void initialize(Configuration hconf, Properties props) throws SerDeException {
+ }
+
+ // this returns the row as is because this formatter is only called when
+ // the ThriftJDBCBinarySerDe was used to serialize the rows to thrift-able objects.
+ @Override
+ public Object convert(Object row, ObjectInspector rowOI) throws Exception {
+ return new Object[] { row };
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/serde/src/java/org/apache/hadoop/hive/serde2/SerDeUtils.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/SerDeUtils.java b/serde/src/java/org/apache/hadoop/hive/serde2/SerDeUtils.java
index 90439a2..6e08dfd 100644
--- a/serde/src/java/org/apache/hadoop/hive/serde2/SerDeUtils.java
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/SerDeUtils.java
@@ -71,7 +71,8 @@ public final class SerDeUtils {
// lower case null is used within json objects
private static final String JSON_NULL = "null";
-
+ public static final String LIST_SINK_OUTPUT_FORMATTER = "list.sink.output.formatter";
+ public static final String LIST_SINK_OUTPUT_PROTOCOL = "list.sink.output.protocol";
public static final Logger LOG = LoggerFactory.getLogger(SerDeUtils.class.getName());
/**
http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ColumnBuffer.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ColumnBuffer.java b/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ColumnBuffer.java
new file mode 100644
index 0000000..929c405
--- /dev/null
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ColumnBuffer.java
@@ -0,0 +1,439 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.serde2.thrift;
+
+import java.nio.ByteBuffer;
+import java.util.AbstractList;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.hive.service.rpc.thrift.TBinaryColumn;
+import org.apache.hive.service.rpc.thrift.TBoolColumn;
+import org.apache.hive.service.rpc.thrift.TByteColumn;
+import org.apache.hive.service.rpc.thrift.TColumn;
+import org.apache.hive.service.rpc.thrift.TDoubleColumn;
+import org.apache.hive.service.rpc.thrift.TI16Column;
+import org.apache.hive.service.rpc.thrift.TI32Column;
+import org.apache.hive.service.rpc.thrift.TI64Column;
+import org.apache.hive.service.rpc.thrift.TStringColumn;
+
+import com.google.common.primitives.Booleans;
+import com.google.common.primitives.Bytes;
+import com.google.common.primitives.Doubles;
+import com.google.common.primitives.Ints;
+import com.google.common.primitives.Longs;
+import com.google.common.primitives.Shorts;
+
+/**
+ * ColumnBuffer
+ */
+public class ColumnBuffer extends AbstractList {
+
+ private static final int DEFAULT_SIZE = 100;
+
+ private final Type type;
+
+ private BitSet nulls;
+
+ private int size;
+ private boolean[] boolVars;
+ private byte[] byteVars;
+ private short[] shortVars;
+ private int[] intVars;
+ private long[] longVars;
+ private double[] doubleVars;
+ private List<String> stringVars;
+ private List<ByteBuffer> binaryVars;
+
+ public ColumnBuffer(Type type, BitSet nulls, Object values) {
+ this.type = type;
+ this.nulls = nulls;
+ if (type == Type.BOOLEAN_TYPE) {
+ boolVars = (boolean[]) values;
+ size = boolVars.length;
+ } else if (type == Type.TINYINT_TYPE) {
+ byteVars = (byte[]) values;
+ size = byteVars.length;
+ } else if (type == Type.SMALLINT_TYPE) {
+ shortVars = (short[]) values;
+ size = shortVars.length;
+ } else if (type == Type.INT_TYPE) {
+ intVars = (int[]) values;
+ size = intVars.length;
+ } else if (type == Type.BIGINT_TYPE) {
+ longVars = (long[]) values;
+ size = longVars.length;
+ } else if (type == Type.DOUBLE_TYPE) {
+ doubleVars = (double[]) values;
+ size = doubleVars.length;
+ } else if (type == Type.BINARY_TYPE) {
+ binaryVars = (List<ByteBuffer>) values;
+ size = binaryVars.size();
+ } else if (type == Type.STRING_TYPE) {
+ stringVars = (List<String>) values;
+ size = stringVars.size();
+ } else {
+ throw new IllegalStateException("invalid union object");
+ }
+ }
+
+ public ColumnBuffer(Type type) {
+ nulls = new BitSet();
+ switch (type) {
+ case BOOLEAN_TYPE:
+ boolVars = new boolean[DEFAULT_SIZE];
+ break;
+ case TINYINT_TYPE:
+ byteVars = new byte[DEFAULT_SIZE];
+ break;
+ case SMALLINT_TYPE:
+ shortVars = new short[DEFAULT_SIZE];
+ break;
+ case INT_TYPE:
+ intVars = new int[DEFAULT_SIZE];
+ break;
+ case BIGINT_TYPE:
+ longVars = new long[DEFAULT_SIZE];
+ break;
+ case FLOAT_TYPE:
+ case DOUBLE_TYPE:
+ type = Type.DOUBLE_TYPE;
+ doubleVars = new double[DEFAULT_SIZE];
+ break;
+ case BINARY_TYPE:
+ binaryVars = new ArrayList<ByteBuffer>();
+ break;
+ default:
+ type = Type.STRING_TYPE;
+ stringVars = new ArrayList<String>();
+ }
+ this.type = type;
+ }
+
+ public ColumnBuffer(TColumn colValues) {
+ if (colValues.isSetBoolVal()) {
+ type = Type.BOOLEAN_TYPE;
+ nulls = toBitset(colValues.getBoolVal().getNulls());
+ boolVars = Booleans.toArray(colValues.getBoolVal().getValues());
+ size = boolVars.length;
+ } else if (colValues.isSetByteVal()) {
+ type = Type.TINYINT_TYPE;
+ nulls = toBitset(colValues.getByteVal().getNulls());
+ byteVars = Bytes.toArray(colValues.getByteVal().getValues());
+ size = byteVars.length;
+ } else if (colValues.isSetI16Val()) {
+ type = Type.SMALLINT_TYPE;
+ nulls = toBitset(colValues.getI16Val().getNulls());
+ shortVars = Shorts.toArray(colValues.getI16Val().getValues());
+ size = shortVars.length;
+ } else if (colValues.isSetI32Val()) {
+ type = Type.INT_TYPE;
+ nulls = toBitset(colValues.getI32Val().getNulls());
+ intVars = Ints.toArray(colValues.getI32Val().getValues());
+ size = intVars.length;
+ } else if (colValues.isSetI64Val()) {
+ type = Type.BIGINT_TYPE;
+ nulls = toBitset(colValues.getI64Val().getNulls());
+ longVars = Longs.toArray(colValues.getI64Val().getValues());
+ size = longVars.length;
+ } else if (colValues.isSetDoubleVal()) {
+ type = Type.DOUBLE_TYPE;
+ nulls = toBitset(colValues.getDoubleVal().getNulls());
+ doubleVars = Doubles.toArray(colValues.getDoubleVal().getValues());
+ size = doubleVars.length;
+ } else if (colValues.isSetBinaryVal()) {
+ type = Type.BINARY_TYPE;
+ nulls = toBitset(colValues.getBinaryVal().getNulls());
+ binaryVars = colValues.getBinaryVal().getValues();
+ size = binaryVars.size();
+ } else if (colValues.isSetStringVal()) {
+ type = Type.STRING_TYPE;
+ nulls = toBitset(colValues.getStringVal().getNulls());
+ stringVars = colValues.getStringVal().getValues();
+ size = stringVars.size();
+ } else {
+ throw new IllegalStateException("invalid union object");
+ }
+ }
+
+ public ColumnBuffer extractSubset(int start, int end) {
+ BitSet subNulls = nulls.get(start, end);
+ if (type == Type.BOOLEAN_TYPE) {
+ ColumnBuffer subset =
+ new ColumnBuffer(type, subNulls, Arrays.copyOfRange(boolVars, start, end));
+ boolVars = Arrays.copyOfRange(boolVars, end, size);
+ nulls = nulls.get(start, size);
+ size = boolVars.length;
+ return subset;
+ }
+ if (type == Type.TINYINT_TYPE) {
+ ColumnBuffer subset =
+ new ColumnBuffer(type, subNulls, Arrays.copyOfRange(byteVars, start, end));
+ byteVars = Arrays.copyOfRange(byteVars, end, size);
+ nulls = nulls.get(start, size);
+ size = byteVars.length;
+ return subset;
+ }
+ if (type == Type.SMALLINT_TYPE) {
+ ColumnBuffer subset =
+ new ColumnBuffer(type, subNulls, Arrays.copyOfRange(shortVars, start, end));
+ shortVars = Arrays.copyOfRange(shortVars, end, size);
+ nulls = nulls.get(start, size);
+ size = shortVars.length;
+ return subset;
+ }
+ if (type == Type.INT_TYPE) {
+ ColumnBuffer subset =
+ new ColumnBuffer(type, subNulls, Arrays.copyOfRange(intVars, start, end));
+ intVars = Arrays.copyOfRange(intVars, end, size);
+ nulls = nulls.get(start, size);
+ size = intVars.length;
+ return subset;
+ }
+ if (type == Type.BIGINT_TYPE) {
+ ColumnBuffer subset =
+ new ColumnBuffer(type, subNulls, Arrays.copyOfRange(longVars, start, end));
+ longVars = Arrays.copyOfRange(longVars, end, size);
+ nulls = nulls.get(start, size);
+ size = longVars.length;
+ return subset;
+ }
+ if (type == Type.DOUBLE_TYPE) {
+ ColumnBuffer subset =
+ new ColumnBuffer(type, subNulls, Arrays.copyOfRange(doubleVars, start, end));
+ doubleVars = Arrays.copyOfRange(doubleVars, end, size);
+ nulls = nulls.get(start, size);
+ size = doubleVars.length;
+ return subset;
+ }
+ if (type == Type.BINARY_TYPE) {
+ ColumnBuffer subset = new ColumnBuffer(type, subNulls, binaryVars.subList(start, end));
+ binaryVars = binaryVars.subList(end, binaryVars.size());
+ nulls = nulls.get(start, size);
+ size = binaryVars.size();
+ return subset;
+ }
+ if (type == Type.STRING_TYPE) {
+ ColumnBuffer subset = new ColumnBuffer(type, subNulls, stringVars.subList(start, end));
+ stringVars = stringVars.subList(end, stringVars.size());
+ nulls = nulls.get(start, size);
+ size = stringVars.size();
+ return subset;
+ }
+ throw new IllegalStateException("invalid union object");
+ }
+
+ private static final byte[] MASKS = new byte[] {
+ 0x01, 0x02, 0x04, 0x08, 0x10, 0x20, 0x40, (byte)0x80
+ };
+
+ private static BitSet toBitset(byte[] nulls) {
+ BitSet bitset = new BitSet();
+ int bits = nulls.length * 8;
+ for (int i = 0; i < bits; i++) {
+ bitset.set(i, (nulls[i / 8] & MASKS[i % 8]) != 0);
+ }
+ return bitset;
+ }
+
+ private static byte[] toBinary(BitSet bitset) {
+ byte[] nulls = new byte[1 + (bitset.length() / 8)];
+ for (int i = 0; i < bitset.length(); i++) {
+ nulls[i / 8] |= bitset.get(i) ? MASKS[i % 8] : 0;
+ }
+ return nulls;
+ }
+
+ public Type getType() {
+ return type;
+ }
+
+ @Override
+ public Object get(int index) {
+ if (nulls.get(index)) {
+ return null;
+ }
+ switch (type) {
+ case BOOLEAN_TYPE:
+ return boolVars[index];
+ case TINYINT_TYPE:
+ return byteVars[index];
+ case SMALLINT_TYPE:
+ return shortVars[index];
+ case INT_TYPE:
+ return intVars[index];
+ case BIGINT_TYPE:
+ return longVars[index];
+ case DOUBLE_TYPE:
+ return doubleVars[index];
+ case STRING_TYPE:
+ return stringVars.get(index);
+ case BINARY_TYPE:
+ return binaryVars.get(index).array();
+ }
+ return null;
+ }
+
+ @Override
+ public int size() {
+ return size;
+ }
+
+ public TColumn toTColumn() {
+ TColumn value = new TColumn();
+ ByteBuffer nullMasks = ByteBuffer.wrap(toBinary(nulls));
+ switch (type) {
+ case BOOLEAN_TYPE:
+ value.setBoolVal(new TBoolColumn(Booleans.asList(Arrays.copyOfRange(boolVars, 0, size)),
+ nullMasks));
+ break;
+ case TINYINT_TYPE:
+ value.setByteVal(new TByteColumn(Bytes.asList(Arrays.copyOfRange(byteVars, 0, size)),
+ nullMasks));
+ break;
+ case SMALLINT_TYPE:
+ value.setI16Val(new TI16Column(Shorts.asList(Arrays.copyOfRange(shortVars, 0, size)),
+ nullMasks));
+ break;
+ case INT_TYPE:
+ value.setI32Val(new TI32Column(Ints.asList(Arrays.copyOfRange(intVars, 0, size)), nullMasks));
+ break;
+ case BIGINT_TYPE:
+ value
+ .setI64Val(new TI64Column(Longs.asList(Arrays.copyOfRange(longVars, 0, size)), nullMasks));
+ break;
+ case DOUBLE_TYPE:
+ value.setDoubleVal(new TDoubleColumn(Doubles.asList(Arrays.copyOfRange(doubleVars, 0, size)),
+ nullMasks));
+ break;
+ case STRING_TYPE:
+ value.setStringVal(new TStringColumn(stringVars, nullMasks));
+ break;
+ case BINARY_TYPE:
+ value.setBinaryVal(new TBinaryColumn(binaryVars, nullMasks));
+ break;
+ }
+ return value;
+ }
+
+ private static final ByteBuffer EMPTY_BINARY = ByteBuffer.allocate(0);
+ private static final String EMPTY_STRING = "";
+
+ public void addValue(Object field) {
+ addValue(this.type, field);
+ }
+
+ public void addValue(Type type, Object field) {
+ switch (type) {
+ case BOOLEAN_TYPE:
+ nulls.set(size, field == null);
+ boolVars()[size] = field == null ? true : (Boolean) field;
+ break;
+ case TINYINT_TYPE:
+ nulls.set(size, field == null);
+ byteVars()[size] = field == null ? 0 : (Byte) field;
+ break;
+ case SMALLINT_TYPE:
+ nulls.set(size, field == null);
+ shortVars()[size] = field == null ? 0 : (Short) field;
+ break;
+ case INT_TYPE:
+ nulls.set(size, field == null);
+ intVars()[size] = field == null ? 0 : (Integer) field;
+ break;
+ case BIGINT_TYPE:
+ nulls.set(size, field == null);
+ longVars()[size] = field == null ? 0 : (Long) field;
+ break;
+ case FLOAT_TYPE:
+ nulls.set(size, field == null);
+ doubleVars()[size] = field == null ? 0 : new Double(field.toString());
+ break;
+ case DOUBLE_TYPE:
+ nulls.set(size, field == null);
+ doubleVars()[size] = field == null ? 0 : (Double) field;
+ break;
+ case BINARY_TYPE:
+ nulls.set(binaryVars.size(), field == null);
+ binaryVars.add(field == null ? EMPTY_BINARY : ByteBuffer.wrap((byte[]) field));
+ break;
+ default:
+ nulls.set(stringVars.size(), field == null);
+ stringVars.add(field == null ? EMPTY_STRING : String.valueOf(field));
+ break;
+ }
+ size++;
+ }
+
+ private boolean[] boolVars() {
+ if (boolVars.length == size) {
+ boolean[] newVars = new boolean[size << 1];
+ System.arraycopy(boolVars, 0, newVars, 0, size);
+ return boolVars = newVars;
+ }
+ return boolVars;
+ }
+
+ private byte[] byteVars() {
+ if (byteVars.length == size) {
+ byte[] newVars = new byte[size << 1];
+ System.arraycopy(byteVars, 0, newVars, 0, size);
+ return byteVars = newVars;
+ }
+ return byteVars;
+ }
+
+ private short[] shortVars() {
+ if (shortVars.length == size) {
+ short[] newVars = new short[size << 1];
+ System.arraycopy(shortVars, 0, newVars, 0, size);
+ return shortVars = newVars;
+ }
+ return shortVars;
+ }
+
+ private int[] intVars() {
+ if (intVars.length == size) {
+ int[] newVars = new int[size << 1];
+ System.arraycopy(intVars, 0, newVars, 0, size);
+ return intVars = newVars;
+ }
+ return intVars;
+ }
+
+ private long[] longVars() {
+ if (longVars.length == size) {
+ long[] newVars = new long[size << 1];
+ System.arraycopy(longVars, 0, newVars, 0, size);
+ return longVars = newVars;
+ }
+ return longVars;
+ }
+
+ private double[] doubleVars() {
+ if (doubleVars.length == size) {
+ double[] newVars = new double[size << 1];
+ System.arraycopy(doubleVars, 0, newVars, 0, size);
+ return doubleVars = newVars;
+ }
+ return doubleVars;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ThriftFormatter.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ThriftFormatter.java b/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ThriftFormatter.java
new file mode 100644
index 0000000..a4c120e
--- /dev/null
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ThriftFormatter.java
@@ -0,0 +1,40 @@
+package org.apache.hadoop.hive.serde2.thrift;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.serde2.FetchFormatter;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+
+public class ThriftFormatter implements FetchFormatter<Object> {
+
+ int protocol;
+
+ @Override
+ public void initialize(Configuration hconf, Properties props) throws Exception {
+ protocol = hconf.getInt(SerDeUtils.LIST_SINK_OUTPUT_PROTOCOL, 0);
+ }
+
+ @Override
+ public Object convert(Object row, ObjectInspector rowOI) throws Exception {
+ StructObjectInspector structOI = (StructObjectInspector) rowOI;
+ List<? extends StructField> fields = structOI.getAllStructFieldRefs();
+ Object[] converted = new Object[fields.size()];
+ for (int i = 0 ; i < converted.length; i++) {
+ StructField fieldRef = fields.get(i);
+ Object field = structOI.getStructFieldData(row, fieldRef);
+ converted[i] = field == null ? null :
+ SerDeUtils.toThriftPayload(field, fieldRef.getFieldObjectInspector(), protocol);
+ }
+ return converted;
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ThriftJDBCBinarySerDe.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ThriftJDBCBinarySerDe.java b/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ThriftJDBCBinarySerDe.java
new file mode 100644
index 0000000..5c31974
--- /dev/null
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ThriftJDBCBinarySerDe.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.serde2.thrift;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.ByteStream;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeStats;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hive.service.rpc.thrift.TColumn;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TIOStreamTransport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This SerDe is used to serialize the final output to thrift-able objects directly in the SerDe. Use this SerDe only for final output resultSets.
+ * It is used if HIVE_SERVER2_THRIFT_RESULTSET_SERIALIZE_IN_TASKS is set to true. It buffers rows that come in from FileSink till it reaches max_buffer_size (also configurable)
+ * or all rows are finished and FileSink.closeOp() is called.
+ */
+public class ThriftJDBCBinarySerDe extends AbstractSerDe {
+ public static final Logger LOG = LoggerFactory.getLogger(ThriftJDBCBinarySerDe.class.getName());
+ private List<String> columnNames;
+ private List<TypeInfo> columnTypes;
+ private ColumnBuffer[] columnBuffers;
+ private TypeInfo rowTypeInfo;
+ private ArrayList<Object> row;
+ private BytesWritable serializedBytesWritable = new BytesWritable();
+ private ByteStream.Output output = new ByteStream.Output();
+ private TProtocol protocol = new TCompactProtocol(new TIOStreamTransport(output));
+ private ThriftFormatter thriftFormatter = new ThriftFormatter();
+ private int MAX_BUFFERED_ROWS;
+ private int count;
+ private StructObjectInspector rowObjectInspector;
+
+
+ @Override
+ public void initialize(Configuration conf, Properties tbl) throws SerDeException {
+ // Get column names
+ MAX_BUFFERED_ROWS = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_MAX_FETCH_SIZE);
+ String columnNameProperty = tbl.getProperty(serdeConstants.LIST_COLUMNS);
+ String columnTypeProperty = tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES);
+ if (columnNameProperty.length() == 0) {
+ columnNames = new ArrayList<String>();
+ } else {
+ columnNames = Arrays.asList(columnNameProperty.split(","));
+ }
+ if (columnTypeProperty.length() == 0) {
+ columnTypes = new ArrayList<TypeInfo>();
+ } else {
+ columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
+ }
+ rowTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNames, columnTypes);
+ rowObjectInspector =
+ (StructObjectInspector) TypeInfoUtils
+ .getStandardWritableObjectInspectorFromTypeInfo(rowTypeInfo);
+
+ initializeRowAndColumns();
+ try {
+ thriftFormatter.initialize(conf, tbl);
+ } catch (Exception e) {
+ new SerDeException(e);
+ }
+ }
+
+ @Override
+ public Class<? extends Writable> getSerializedClass() {
+ return BytesWritable.class;
+ }
+
+ private Writable serializeBatch() throws SerDeException {
+ output.reset();
+ for (int i = 0; i < columnBuffers.length; i++) {
+ TColumn tColumn = columnBuffers[i].toTColumn();
+ try {
+ tColumn.write(protocol);
+ } catch(TException e) {
+ throw new SerDeException(e);
+ }
+ }
+ initializeRowAndColumns();
+ serializedBytesWritable.set(output.getData(), 0, output.getLength());
+ return serializedBytesWritable;
+ }
+
+ // use the columnNames to initialize the reusable row object and the columnBuffers. reason this is being done is if buffer is full, we should reinitialize the
+ // column buffers, otherwise at the end when closeOp() is called, things get printed multiple times.
+ private void initializeRowAndColumns() {
+ row = new ArrayList<Object>(columnNames.size());
+ for (int i = 0; i < columnNames.size(); i++) {
+ row.add(null);
+ }
+ // Initialize column buffers
+ columnBuffers = new ColumnBuffer[columnNames.size()];
+ for (int i = 0; i < columnBuffers.length; i++) {
+ columnBuffers[i] = new ColumnBuffer(Type.getType(columnTypes.get(i)));
+ }
+ }
+
+ /**
+ * Write TColumn objects to the underlying stream of TProtocol
+ */
+ @Override
+ public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException {
+ //if row is null, it means there are no more rows (closeOp()). another case can be that the buffer is full.
+ if (obj == null)
+ return serializeBatch();
+ count += 1;
+ StructObjectInspector soi = (StructObjectInspector) objInspector;
+ List<? extends StructField> fields = soi.getAllStructFieldRefs();
+ try {
+ Object[] formattedRow = (Object[]) thriftFormatter.convert(obj, objInspector);
+ for (int i = 0; i < columnNames.size(); i++) {
+ columnBuffers[i].addValue(formattedRow[i]);
+ }
+ } catch (Exception e) {
+ throw new SerDeException(e);
+ }
+ if (count == MAX_BUFFERED_ROWS) {
+ count = 0;
+ return serializeBatch();
+ }
+ return null;
+ }
+
+ @Override
+ public SerDeStats getSerDeStats() {
+ return null;
+ }
+
+ /**
+ * Return the bytes from this writable blob.
+ * Eventually the client of this method will interpret the byte using the Thrift Protocol
+ */
+ @Override
+ public Object deserialize(Writable blob) throws SerDeException {
+ return ((BytesWritable) blob).getBytes();
+ }
+
+ @Override
+ public ObjectInspector getObjectInspector() throws SerDeException {
+ return rowObjectInspector;
+ }
+
+}