You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/03/12 19:22:03 UTC
svn commit: r1455659 [10/11] - in /hive/trunk: ./
ant/src/org/apache/hadoop/hive/ant/ bin/ bin/ext/ cli/ common/
common/src/gen/ common/src/gen/org/ common/src/gen/org/apache/
common/src/gen/org/apache/hive/ common/src/gen/org/apache/hive/common/
commo...
Added: hive/trunk/service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java?rev=1455659&view=auto
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java (added)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java Tue Mar 12 18:22:00 2013
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.cli.operation;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hive.metastore.api.Schema;
+import org.apache.hadoop.hive.ql.processors.CommandProcessor;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hive.service.cli.FetchOrientation;
+import org.apache.hive.service.cli.HiveSQLException;
+import org.apache.hive.service.cli.OperationState;
+import org.apache.hive.service.cli.RowSet;
+import org.apache.hive.service.cli.TableSchema;
+import org.apache.hive.service.cli.session.HiveSession;
+
+/**
+ * HiveCommandOperation.
+ *
+ */
+public abstract class HiveCommandOperation extends ExecuteStatementOperation {
+ private CommandProcessorResponse response;
+ private CommandProcessor commandProcessor;
+ private TableSchema resultSchema = null;
+
+ /**
+ * For processors other than Hive queries (Driver), they output to session.out (a temp file)
+ * first and the fetchOne/fetchN/fetchAll functions get the output from pipeIn.
+ */
+ private BufferedReader resultReader;
+
+
+ protected HiveCommandOperation(HiveSession parentSession, String statement, Map<String, String> confOverlay) {
+ super(parentSession, statement, confOverlay);
+ setupSessionIO(parentSession.getSessionState());
+ }
+
+ private void setupSessionIO(SessionState sessionState) {
+ try {
+ LOG.info("Putting temp output to file " + sessionState.getTmpOutputFile().toString());
+ sessionState.in = null; // hive server's session input stream is not used
+ // open a per-session file in auto-flush mode for writing temp results
+ sessionState.out = new PrintStream(new FileOutputStream(sessionState.getTmpOutputFile()), true, "UTF-8");
+ // TODO: for hadoop jobs, progress is printed out to session.err,
+ // we should find a way to feed back job progress to client
+ sessionState.err = new PrintStream(System.err, true, "UTF-8");
+ } catch (IOException e) {
+ LOG.error("Error in creating temp output file ", e);
+ try {
+ sessionState.in = null;
+ sessionState.out = new PrintStream(System.out, true, "UTF-8");
+ sessionState.err = new PrintStream(System.err, true, "UTF-8");
+ } catch (UnsupportedEncodingException ee) {
+ ee.printStackTrace();
+ sessionState.out = null;
+ sessionState.err = null;
+ }
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hive.service.cli.operation.Operation#run()
+ */
+ @Override
+ public void run() throws HiveSQLException {
+ setState(OperationState.RUNNING);
+ try {
+ String command = getStatement().trim();
+ String[] tokens = statement.split("\\s");
+ String commandArgs = command.substring(tokens[0].length()).trim();
+
+ response = getCommandProcessor().run(commandArgs);
+ int returnCode = response.getResponseCode();
+ String sqlState = response.getSQLState();
+ String errorMessage = response.getErrorMessage();
+ Schema schema = response.getSchema();
+ if (schema != null) {
+ setHasResultSet(true);
+ resultSchema = new TableSchema(schema);
+ } else {
+ setHasResultSet(false);
+ resultSchema = new TableSchema();
+ }
+ } catch (Exception e) {
+ setState(OperationState.ERROR);
+ throw new HiveSQLException("Error running query: " + e.toString());
+ }
+ setState(OperationState.FINISHED);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hive.service.cli.operation.Operation#close()
+ */
+ @Override
+ public void close() throws HiveSQLException {
+ setState(OperationState.CLOSED);
+ cleanTmpFile();
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hive.service.cli.operation.Operation#getResultSetSchema()
+ */
+ @Override
+ public TableSchema getResultSetSchema() throws HiveSQLException {
+ return resultSchema;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hive.service.cli.operation.Operation#getNextRowSet(org.apache.hive.service.cli.FetchOrientation, long)
+ */
+ @Override
+ public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException {
+ List<String> rows = readResults((int) maxRows);
+ RowSet rowSet = new RowSet();
+
+ for (String row : rows) {
+ rowSet.addRow(resultSchema, new String[] {row});
+ }
+ return rowSet;
+ }
+
+ /**
+ * Reads the temporary results for non-Hive (non-Driver) commands to the
+ * resulting List of strings.
+ * @param results list of strings containing the results
+ * @param nLines number of lines read at once. If it is <= 0, then read all lines.
+ */
+ private List<String> readResults(int nLines) throws HiveSQLException {
+ if (resultReader == null) {
+ SessionState sessionState = getParentSession().getSessionState();
+ File tmp = sessionState.getTmpOutputFile();
+ try {
+ resultReader = new BufferedReader(new FileReader(tmp));
+ } catch (FileNotFoundException e) {
+ LOG.error("File " + tmp + " not found. ", e);
+ throw new HiveSQLException(e);
+ }
+ }
+
+ List<String> results = new ArrayList<String>();
+
+ for (int i = 0; i < nLines || nLines <= 0; ++i) {
+ try {
+ String line = resultReader.readLine();
+ if (line == null) {
+ // reached the end of the result file
+ break;
+ } else {
+ results.add(line);
+ }
+ } catch (IOException e) {
+ LOG.error("Reading temp results encountered an exception: ", e);
+ throw new HiveSQLException(e);
+ }
+ }
+ return results;
+ }
+
+ private void cleanTmpFile() {
+ if (resultReader != null) {
+ SessionState sessionState = getParentSession().getSessionState();
+ File tmp = sessionState.getTmpOutputFile();
+ tmp.delete();
+ resultReader = null;
+ }
+ }
+
+ protected CommandProcessor getCommandProcessor() {
+ return commandProcessor;
+ }
+
+ protected void setCommandProcessor(CommandProcessor commandProcessor) {
+ this.commandProcessor = commandProcessor;
+ }
+}
Added: hive/trunk/service/src/java/org/apache/hive/service/cli/operation/MetadataOperation.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/operation/MetadataOperation.java?rev=1455659&view=auto
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/operation/MetadataOperation.java (added)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/operation/MetadataOperation.java Tue Mar 12 18:22:00 2013
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.cli.operation;
+
+import org.apache.hive.service.cli.HiveSQLException;
+import org.apache.hive.service.cli.OperationState;
+import org.apache.hive.service.cli.OperationType;
+import org.apache.hive.service.cli.TableSchema;
+import org.apache.hive.service.cli.session.HiveSession;
+
+/**
+ * MetadataOperation.
+ *
+ */
+public abstract class MetadataOperation extends Operation {
+
+ protected static final String DEFAULT_HIVE_CATALOG = "";
+ protected static TableSchema RESULT_SET_SCHEMA;
+ private static final char SEARCH_STRING_ESCAPE = '\\';
+
+ protected MetadataOperation(HiveSession parentSession, OperationType opType) {
+ super(parentSession, opType);
+ setHasResultSet(true);
+ }
+
+
+ /* (non-Javadoc)
+ * @see org.apache.hive.service.cli.Operation#close()
+ */
+ @Override
+ public void close() throws HiveSQLException {
+ setState(OperationState.CLOSED);
+ }
+
+ /**
+ * Convert wildchars and escape sequence from JDBC format to datanucleous/regex
+ */
+ protected String convertIdentifierPattern(final String pattern, boolean datanucleusFormat) {
+ if (pattern == null) {
+ return convertPattern("%", true);
+ } else {
+ return convertPattern(pattern, datanucleusFormat);
+ }
+ }
+
+ /**
+ * Convert wildchars and escape sequence of schema pattern from JDBC format to datanucleous/regex
+ * The schema pattern treats empty string also as wildchar
+ */
+ protected String convertSchemaPattern(final String pattern) {
+ if ((pattern == null) || pattern.isEmpty()) {
+ return convertPattern("%", true);
+ } else {
+ return convertPattern(pattern, true);
+ }
+ }
+
+ /**
+ * Convert a pattern containing JDBC catalog search wildcards into
+ * Java regex patterns.
+ *
+ * @param pattern input which may contain '%' or '_' wildcard characters, or
+ * these characters escaped using {@link #getSearchStringEscape()}.
+ * @return replace %/_ with regex search characters, also handle escaped
+ * characters.
+ *
+ * The datanucleus module expects the wildchar as '*'. The columns search on the
+ * other hand is done locally inside the hive code and that requires the regex wildchar
+ * format '.*' This is driven by the datanucleusFormat flag.
+ */
+ private String convertPattern(final String pattern, boolean datanucleusFormat) {
+ String wStr;
+ if (datanucleusFormat) {
+ wStr = "*";
+ } else {
+ wStr = ".*";
+ }
+ return pattern
+ .replaceAll("([^\\\\])%", "$1" + wStr).replaceAll("\\\\%", "%").replaceAll("^%", wStr)
+ .replaceAll("([^\\\\])_", "$1.").replaceAll("\\\\_", "_").replaceAll("^_", ".");
+ }
+
+}
Added: hive/trunk/service/src/java/org/apache/hive/service/cli/operation/Operation.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/operation/Operation.java?rev=1455659&view=auto
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/operation/Operation.java (added)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/operation/Operation.java Tue Mar 12 18:22:00 2013
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.service.cli.operation;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hive.service.cli.FetchOrientation;
+import org.apache.hive.service.cli.HiveSQLException;
+import org.apache.hive.service.cli.OperationHandle;
+import org.apache.hive.service.cli.OperationState;
+import org.apache.hive.service.cli.OperationType;
+import org.apache.hive.service.cli.RowSet;
+import org.apache.hive.service.cli.TableSchema;
+import org.apache.hive.service.cli.session.HiveSession;
+
+
+
+public abstract class Operation {
+ private final HiveSession parentSession;
+ private OperationState state = OperationState.INITIALIZED;
+ private final OperationHandle opHandle;
+ private HiveConf configuration;
+ public static final Log LOG = LogFactory.getLog(Operation.class.getName());
+ public static final long DEFAULT_FETCH_MAX_ROWS = 100;
+ protected boolean hasResultSet;
+
+ protected Operation(HiveSession parentSession, OperationType opType) {
+ super();
+ this.parentSession = parentSession;
+ opHandle = new OperationHandle(opType);
+ }
+
+ public void setConfiguration(HiveConf configuration) {
+ this.configuration = new HiveConf(configuration);
+ }
+
+ public HiveConf getConfiguration() {
+ return new HiveConf(configuration);
+ }
+
+ public HiveSession getParentSession() {
+ return parentSession;
+ }
+
+ public OperationHandle getHandle() {
+ return opHandle;
+ }
+
+ public OperationType getType() {
+ return opHandle.getOperationType();
+ }
+
+ public OperationState getState() {
+ return state;
+ }
+
+ public boolean hasResultSet() {
+ return hasResultSet;
+ }
+
+ protected void setHasResultSet(boolean hasResultSet) {
+ this.hasResultSet = hasResultSet;
+ opHandle.setHasResultSet(hasResultSet);
+ }
+
+ protected final OperationState setState(OperationState newState) throws HiveSQLException {
+ state.validateTransition(newState);
+ this.state = newState;
+ return this.state;
+ }
+
+ protected final void assertState(OperationState state) throws HiveSQLException {
+ if (this.state != state) {
+ throw new HiveSQLException("Expected state " + state + ", but found " + this.state);
+ }
+ }
+
+ public boolean isRunning() {
+ return OperationState.RUNNING.equals(getState());
+ }
+
+ public boolean isFinished() {
+ return OperationState.FINISHED.equals(getState());
+ }
+
+ public boolean isCanceled() {
+ return OperationState.CANCELED.equals(getState());
+ }
+
+ public boolean isFailed() {
+ return OperationState.ERROR.equals(getState());
+ }
+
+ public abstract void run() throws HiveSQLException;
+
+ // TODO: make this abstract and implement in subclasses.
+ public void cancel() throws HiveSQLException {
+ setState(OperationState.CANCELED);
+ throw new UnsupportedOperationException("SQLOperation.cancel()");
+ }
+
+ public abstract void close() throws HiveSQLException;
+
+ public abstract TableSchema getResultSetSchema() throws HiveSQLException;
+
+ public abstract RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException;
+
+ public RowSet getNextRowSet() throws HiveSQLException {
+ return getNextRowSet(FetchOrientation.FETCH_NEXT, DEFAULT_FETCH_MAX_ROWS);
+ }
+}
Added: hive/trunk/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java?rev=1455659&view=auto
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java (added)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java Tue Mar 12 18:22:00 2013
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.cli.operation;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hive.service.AbstractService;
+import org.apache.hive.service.cli.FetchOrientation;
+import org.apache.hive.service.cli.HiveSQLException;
+import org.apache.hive.service.cli.OperationHandle;
+import org.apache.hive.service.cli.OperationState;
+import org.apache.hive.service.cli.RowSet;
+import org.apache.hive.service.cli.TableSchema;
+import org.apache.hive.service.cli.session.HiveSession;
+
+/**
+ * OperationManager.
+ *
+ */
+public class OperationManager extends AbstractService {
+
+ private HiveConf hiveConf;
+ private final Map<OperationHandle, Operation> handleToOperation =
+ new HashMap<OperationHandle, Operation>();
+
+ public OperationManager() {
+ super("OperationManager");
+ }
+
+ @Override
+ public synchronized void init(HiveConf hiveConf) {
+ this.hiveConf = hiveConf;
+
+ super.init(hiveConf);
+ }
+
+ @Override
+ public synchronized void start() {
+ super.start();
+ // TODO
+ }
+
+ @Override
+ public synchronized void stop() {
+ // TODO
+ super.stop();
+ }
+
+ public ExecuteStatementOperation newExecuteStatementOperation(HiveSession parentSession,
+ String statement, Map<String, String> confOverlay) {
+ ExecuteStatementOperation executeStatementOperation = ExecuteStatementOperation
+ .newExecuteStatementOperation(parentSession, statement, confOverlay);
+ addOperation(executeStatementOperation);
+ return executeStatementOperation;
+ }
+
+ public GetTypeInfoOperation newGetTypeInfoOperation(HiveSession parentSession) {
+ GetTypeInfoOperation operation = new GetTypeInfoOperation(parentSession);
+ addOperation(operation);
+ return operation;
+ }
+
+ public GetCatalogsOperation newGetCatalogsOperation(HiveSession parentSession) {
+ GetCatalogsOperation operation = new GetCatalogsOperation(parentSession);
+ addOperation(operation);
+ return operation;
+ }
+
+ public GetSchemasOperation newGetSchemasOperation(HiveSession parentSession,
+ String catalogName, String schemaName) {
+ GetSchemasOperation operation = new GetSchemasOperation(parentSession, catalogName, schemaName);
+ addOperation(operation);
+ return operation;
+ }
+
+ public MetadataOperation newGetTablesOperation(HiveSession parentSession,
+ String catalogName, String schemaName, String tableName,
+ List<String> tableTypes) {
+ MetadataOperation operation =
+ new GetTablesOperation(parentSession, catalogName, schemaName, tableName, tableTypes);
+ addOperation(operation);
+ return operation;
+ }
+
+ public GetTableTypesOperation newGetTableTypesOperation(HiveSession parentSession) {
+ GetTableTypesOperation operation = new GetTableTypesOperation(parentSession);
+ addOperation(operation);
+ return operation;
+ }
+
+ public GetColumnsOperation newGetColumnsOperation(HiveSession parentSession,
+ String catalogName, String schemaName, String tableName, String columnName) {
+ GetColumnsOperation operation = new GetColumnsOperation(parentSession,
+ catalogName, schemaName, tableName, columnName);
+ addOperation(operation);
+ return operation;
+ }
+
+ public GetFunctionsOperation newGetFunctionsOperation(HiveSession parentSession,
+ String catalogName, String schemaName, String functionName) {
+ GetFunctionsOperation operation = new GetFunctionsOperation(parentSession,
+ catalogName, schemaName, functionName);
+ addOperation(operation);
+ return operation;
+ }
+
+ public synchronized Operation getOperation(OperationHandle operationHandle) throws HiveSQLException {
+ Operation operation = handleToOperation.get(operationHandle);
+ if (operation == null) {
+ throw new HiveSQLException("Invalid OperationHandle: " + operationHandle);
+ }
+ return operation;
+ }
+
+ private synchronized void addOperation(Operation operation) {
+ handleToOperation.put(operation.getHandle(), operation);
+ }
+
+ private synchronized Operation removeOperation(OperationHandle opHandle) {
+ return handleToOperation.remove(opHandle);
+ }
+
+ public OperationState getOperationState(OperationHandle opHandle) throws HiveSQLException {
+ return getOperation(opHandle).getState();
+ }
+
+ public void cancelOperation(OperationHandle opHandle) throws HiveSQLException {
+ getOperation(opHandle).cancel();
+ }
+
+ public void closeOperation(OperationHandle opHandle) throws HiveSQLException {
+ Operation operation = removeOperation(opHandle);
+ if (operation == null) {
+ throw new HiveSQLException("Operation does not exist!");
+ }
+ operation.close();
+ }
+
+ public TableSchema getOperationResultSetSchema(OperationHandle opHandle)
+ throws HiveSQLException {
+ return getOperation(opHandle).getResultSetSchema();
+ }
+
+ public RowSet getOperationNextRowSet(OperationHandle opHandle) throws HiveSQLException {
+ return getOperation(opHandle).getNextRowSet();
+ }
+
+ public RowSet getOperationNextRowSet(OperationHandle opHandle,
+ FetchOrientation orientation, long maxRows)
+ throws HiveSQLException {
+ return getOperation(opHandle).getNextRowSet(orientation, maxRows);
+ }
+}
Added: hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java?rev=1455659&view=auto
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java (added)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java Tue Mar 12 18:22:00 2013
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.cli.operation;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Schema;
+import org.apache.hadoop.hive.ql.CommandNeedRetryException;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.parse.VariableSubstitution;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hive.service.cli.FetchOrientation;
+import org.apache.hive.service.cli.HiveSQLException;
+import org.apache.hive.service.cli.OperationState;
+import org.apache.hive.service.cli.RowSet;
+import org.apache.hive.service.cli.TableSchema;
+import org.apache.hive.service.cli.session.HiveSession;
+
+/**
+ * SQLOperation.
+ *
+ */
+public class SQLOperation extends ExecuteStatementOperation {
+
+ private Driver driver = null;
+ private CommandProcessorResponse response;
+ private TableSchema resultSchema = null;
+ private Schema mResultSchema = null;
+ private SerDe serde = null;
+
+
+ public SQLOperation(HiveSession parentSession, String statement, Map<String, String> confOverlay) {
+ // TODO: call setRemoteUser in ExecuteStatementOperation or higher.
+ super(parentSession, statement, confOverlay);
+ }
+
+
+ public void prepare() throws HiveSQLException {
+ }
+
+ @Override
+ public void run() throws HiveSQLException {
+ setState(OperationState.RUNNING);
+ String statement_trimmed = statement.trim();
+ String[] tokens = statement_trimmed.split("\\s");
+ String cmd_1 = statement_trimmed.substring(tokens[0].length()).trim();
+
+ int ret = 0;
+ String errorMessage = "";
+ String SQLState = null;
+
+ try {
+ driver = new Driver(getParentSession().getHiveConf());
+ // In Hive server mode, we are not able to retry in the FetchTask
+ // case, when calling fetch queries since execute() has returned.
+ // For now, we disable the test attempts.
+ driver.setTryCount(Integer.MAX_VALUE);
+
+ String subStatement = new VariableSubstitution().substitute(getParentSession().getHiveConf(), statement);
+
+ response = driver.run(subStatement);
+ if (0 != response.getResponseCode()) {
+ throw new HiveSQLException("Error while processing statement: "
+ + response.getErrorMessage(), response.getSQLState(), response.getResponseCode());
+ }
+
+ mResultSchema = driver.getSchema();
+ if (mResultSchema != null && mResultSchema.isSetFieldSchemas()) {
+ resultSchema = new TableSchema(mResultSchema);
+ setHasResultSet(true);
+ } else {
+ setHasResultSet(false);
+ }
+ } catch (HiveSQLException e) {
+ setState(OperationState.ERROR);
+ throw e;
+ } catch (Exception e) {
+ setState(OperationState.ERROR);
+ throw new HiveSQLException("Error running query: " + e.toString());
+ }
+ setState(OperationState.FINISHED);
+ }
+
+ @Override
+ public void cancel() throws HiveSQLException {
+ setState(OperationState.CANCELED);
+ if (driver != null) {
+ driver.close();
+ driver.destroy();
+ }
+
+ SessionState session = SessionState.get();
+ if (session.getTmpOutputFile() != null) {
+ session.getTmpOutputFile().delete();
+ }
+ }
+
+ @Override
+ public void close() throws HiveSQLException {
+ setState(OperationState.CLOSED);
+ if (driver != null) {
+ driver.close();
+ driver.destroy();
+ }
+
+ SessionState session = SessionState.get();
+ if (session.getTmpOutputFile() != null) {
+ session.getTmpOutputFile().delete();
+ }
+ }
+
+ @Override
+ public TableSchema getResultSetSchema() throws HiveSQLException {
+ assertState(OperationState.FINISHED);
+ if (resultSchema == null) {
+ resultSchema = new TableSchema(driver.getSchema());
+ }
+ return resultSchema;
+ }
+
+
+ @Override
+ public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException {
+ assertState(OperationState.FINISHED);
+ ArrayList<String> rows = new ArrayList<String>();
+ driver.setMaxRows((int)maxRows);
+
+ try {
+ driver.getResults(rows);
+
+ getSerDe();
+ StructObjectInspector soi = (StructObjectInspector) serde.getObjectInspector();
+ List<? extends StructField> fieldRefs = soi.getAllStructFieldRefs();
+ RowSet rowSet = new RowSet();
+
+ Object[] deserializedFields = new Object[fieldRefs.size()];
+ Object rowObj;
+ ObjectInspector fieldOI;
+
+ for (String rowString : rows) {
+ rowObj = serde.deserialize(new BytesWritable(rowString.getBytes()));
+ for (int i = 0; i < fieldRefs.size(); i++) {
+ StructField fieldRef = fieldRefs.get(i);
+ fieldOI = fieldRef.getFieldObjectInspector();
+ deserializedFields[i] = convertLazyToJava(soi.getStructFieldData(rowObj, fieldRef), fieldOI);
+ }
+ rowSet.addRow(resultSchema, deserializedFields);
+ }
+ return rowSet;
+ } catch (IOException e) {
+ throw new HiveSQLException(e);
+ } catch (CommandNeedRetryException e) {
+ throw new HiveSQLException(e);
+ } catch (Exception e) {
+ throw new HiveSQLException(e);
+ }
+ }
+
+ /**
+ * Convert a LazyObject to a standard Java object in compliance with JDBC 3.0 (see JDBC 3.0
+ * Specification, Table B-3: Mapping from JDBC Types to Java Object Types).
+ *
+ * This method is kept consistent with {@link HiveResultSetMetaData#hiveTypeToSqlType}.
+ */
+ private static Object convertLazyToJava(Object o, ObjectInspector oi) {
+ Object obj = ObjectInspectorUtils.copyToStandardObject(o, oi, ObjectInspectorCopyOption.JAVA);
+
+ // for now, expose non-primitive as a string
+ // TODO: expose non-primitive as a structured object while maintaining JDBC compliance
+ if (obj != null && oi.getCategory() != ObjectInspector.Category.PRIMITIVE) {
+ obj = obj.toString();
+ }
+
+ return obj;
+ }
+
+
+ private SerDe getSerDe() throws SQLException {
+ if (serde != null) {
+ return serde;
+ }
+ try {
+ List<FieldSchema> fieldSchemas = mResultSchema.getFieldSchemas();
+ List<String> columnNames = new ArrayList<String>();
+ List<String> columnTypes = new ArrayList<String>();
+ StringBuilder namesSb = new StringBuilder();
+ StringBuilder typesSb = new StringBuilder();
+
+ if (fieldSchemas != null && !fieldSchemas.isEmpty()) {
+ for (int pos = 0; pos < fieldSchemas.size(); pos++) {
+ if (pos != 0) {
+ namesSb.append(",");
+ typesSb.append(",");
+ }
+ columnNames.add(fieldSchemas.get(pos).getName());
+ columnTypes.add(fieldSchemas.get(pos).getType());
+ namesSb.append(fieldSchemas.get(pos).getName());
+ typesSb.append(fieldSchemas.get(pos).getType());
+ }
+ }
+ String names = namesSb.toString();
+ String types = typesSb.toString();
+
+ serde = new LazySimpleSerDe();
+ Properties props = new Properties();
+ if (names.length() > 0) {
+ LOG.debug("Column names: " + names);
+ props.setProperty(serdeConstants.LIST_COLUMNS, names);
+ }
+ if (types.length() > 0) {
+ LOG.debug("Column types: " + types);
+ props.setProperty(serdeConstants.LIST_COLUMN_TYPES, types);
+ }
+ serde.initialize(new HiveConf(), props);
+
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ throw new SQLException("Could not create ResultSet: " + ex.getMessage(), ex);
+ }
+ return serde;
+ }
+
+}
Added: hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SetOperation.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SetOperation.java?rev=1455659&view=auto
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SetOperation.java (added)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SetOperation.java Tue Mar 12 18:22:00 2013
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.cli.operation;
+
+import java.util.Map;
+
+import org.apache.hadoop.hive.ql.processors.SetProcessor;
+import org.apache.hive.service.cli.session.HiveSession;
+
+/**
+ * HiveSetCommandOperation.
+ *
+ */
+public class SetOperation extends HiveCommandOperation {
+
+ protected SetOperation(HiveSession parentSession, String statement,
+ Map<String, String> confOverlay) {
+ super(parentSession, statement, confOverlay);
+ setCommandProcessor(new SetProcessor());
+ }
+
+}
Added: hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSession.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSession.java?rev=1455659&view=auto
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSession.java (added)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSession.java Tue Mar 12 18:22:00 2013
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.cli.session;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hive.service.cli.FetchOrientation;
+import org.apache.hive.service.cli.GetInfoType;
+import org.apache.hive.service.cli.GetInfoValue;
+import org.apache.hive.service.cli.HiveSQLException;
+import org.apache.hive.service.cli.OperationHandle;
+import org.apache.hive.service.cli.RowSet;
+import org.apache.hive.service.cli.SessionHandle;
+import org.apache.hive.service.cli.TableSchema;
+import org.apache.hive.service.cli.operation.OperationManager;
+
+public interface HiveSession {
+ /**
+ * Set the session manager for the session
+ * @param sessionManager
+ */
+ public void setSessionManager(SessionManager sessionManager);
+
+ /**
+ * Set operation manager for the session
+ * @param operationManager
+ */
+ public void setOperationManager(OperationManager operationManager);
+
+ public SessionHandle getSessionHandle();
+
+ public String getUsername();
+
+ public String getPassword();
+
+ public HiveConf getHiveConf();
+
+ public IMetaStoreClient getMetaStoreClient() throws HiveSQLException;
+
+ /**
+ * getInfo operation handler
+ * @param getInfoType
+ * @return
+ * @throws HiveSQLException
+ */
+ public GetInfoValue getInfo(GetInfoType getInfoType) throws HiveSQLException;
+
+ /**
+ * execute operation handler
+ * @param statement
+ * @param confOverlay
+ * @return
+ * @throws HiveSQLException
+ */
+ public OperationHandle executeStatement(String statement,
+ Map<String, String> confOverlay) throws HiveSQLException;
+
+ /**
+ * getTypeInfo operation handler
+ * @return
+ * @throws HiveSQLException
+ */
+ public OperationHandle getTypeInfo() throws HiveSQLException;
+
+ /**
+ * getCatalogs operation handler
+ * @return
+ * @throws HiveSQLException
+ */
+ public OperationHandle getCatalogs() throws HiveSQLException;
+
+ /**
+ * getSchemas operation handler
+ * @param catalogName
+ * @param schemaName
+ * @return
+ * @throws HiveSQLException
+ */
+ public OperationHandle getSchemas(String catalogName, String schemaName)
+ throws HiveSQLException;
+
+ /**
+ * getTables operation handler
+ * @param catalogName
+ * @param schemaName
+ * @param tableName
+ * @param tableTypes
+ * @return
+ * @throws HiveSQLException
+ */
+ public OperationHandle getTables(String catalogName, String schemaName,
+ String tableName, List<String> tableTypes) throws HiveSQLException;
+
+ /**
+ * getTableTypes operation handler
+ * @return
+ * @throws HiveSQLException
+ */
+ public OperationHandle getTableTypes() throws HiveSQLException ;
+
+ /**
+ * getColumns operation handler
+ * @param catalogName
+ * @param schemaName
+ * @param tableName
+ * @param columnName
+ * @return
+ * @throws HiveSQLException
+ */
+ public OperationHandle getColumns(String catalogName, String schemaName,
+ String tableName, String columnName) throws HiveSQLException;
+
+ /**
+ * getFunctions operation handler
+ * @param catalogName
+ * @param schemaName
+ * @param functionName
+ * @return
+ * @throws HiveSQLException
+ */
+ public OperationHandle getFunctions(String catalogName, String schemaName,
+ String functionName) throws HiveSQLException;
+
+ /**
+ * close the session
+ * @throws HiveSQLException
+ */
+ public void close() throws HiveSQLException;
+
+ public void cancelOperation(OperationHandle opHandle) throws HiveSQLException;
+
+ public void closeOperation(OperationHandle opHandle) throws HiveSQLException;
+
+ public TableSchema getResultSetMetadata(OperationHandle opHandle)
+ throws HiveSQLException;
+
+ public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, long maxRows)
+ throws HiveSQLException;
+
+ public RowSet fetchResults(OperationHandle opHandle) throws HiveSQLException;
+
+ public SessionState getSessionState();
+
+ public String getUserName();
+
+ public void setUserName(String userName);
+}
Added: hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java?rev=1455659&view=auto
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java (added)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java Tue Mar 12 18:22:00 2013
@@ -0,0 +1,344 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.cli.session;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hive.service.cli.FetchOrientation;
+import org.apache.hive.service.cli.GetInfoType;
+import org.apache.hive.service.cli.GetInfoValue;
+import org.apache.hive.service.cli.HiveSQLException;
+import org.apache.hive.service.cli.OperationHandle;
+import org.apache.hive.service.cli.RowSet;
+import org.apache.hive.service.cli.SessionHandle;
+import org.apache.hive.service.cli.TableSchema;
+import org.apache.hive.service.cli.operation.ExecuteStatementOperation;
+import org.apache.hive.service.cli.operation.GetCatalogsOperation;
+import org.apache.hive.service.cli.operation.GetColumnsOperation;
+import org.apache.hive.service.cli.operation.GetFunctionsOperation;
+import org.apache.hive.service.cli.operation.GetSchemasOperation;
+import org.apache.hive.service.cli.operation.GetTableTypesOperation;
+import org.apache.hive.service.cli.operation.GetTypeInfoOperation;
+import org.apache.hive.service.cli.operation.MetadataOperation;
+import org.apache.hive.service.cli.operation.OperationManager;
+
+/**
+ * HiveSession
+ *
+ */
+public class HiveSessionImpl implements HiveSession {
+
+ private final SessionHandle sessionHandle = new SessionHandle();
+ private String username;
+ private final String password;
+ private final Map<String, String> sessionConf = new HashMap<String, String>();
+ private final HiveConf hiveConf = new HiveConf();
+ private final SessionState sessionState;
+
+ private static final String FETCH_WORK_SERDE_CLASS =
+ "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe";
+
+ private SessionManager sessionManager;
+ private OperationManager operationManager;
+ private IMetaStoreClient metastoreClient = null;
+
+ public HiveSessionImpl(String username, String password, Map<String, String> sessionConf) {
+ this.username = username;
+ this.password = password;
+
+ if (sessionConf != null) {
+ for (Map.Entry<String, String> entry : sessionConf.entrySet()) {
+ hiveConf.set(entry.getKey(), entry.getValue());
+ }
+ }
+
+ sessionState = new SessionState(hiveConf);
+ }
+
+ private SessionManager getSessionManager() {
+ return sessionManager;
+ }
+
+ public void setSessionManager(SessionManager sessionManager) {
+ this.sessionManager = sessionManager;
+ }
+
+ private OperationManager getOperationManager() {
+ return operationManager;
+ }
+
+ public void setOperationManager(OperationManager operationManager) {
+ this.operationManager = operationManager;
+ }
+
+ protected synchronized void acquire() throws HiveSQLException {
+ SessionState.start(sessionState);
+ }
+
+ protected synchronized void release() {
+ assert sessionState != null;
+ // no need to release sessionState...
+ }
+
+ public SessionHandle getSessionHandle() {
+ return sessionHandle;
+ }
+
+ public String getUsername() {
+ return username;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public HiveConf getHiveConf() {
+ hiveConf.setVar(HiveConf.ConfVars.HIVEFETCHOUTPUTSERDE, FETCH_WORK_SERDE_CLASS);
+ return hiveConf;
+ }
+
+ public IMetaStoreClient getMetaStoreClient() throws HiveSQLException {
+ if (metastoreClient == null) {
+ try {
+ metastoreClient = new HiveMetaStoreClient(getHiveConf());
+ } catch (MetaException e) {
+ throw new HiveSQLException(e);
+ }
+ }
+ return metastoreClient;
+ }
+
+ public GetInfoValue getInfo(GetInfoType getInfoType)
+ throws HiveSQLException {
+ acquire();
+ try {
+ switch (getInfoType) {
+ case CLI_SERVER_NAME:
+ return new GetInfoValue("Hive");
+ case CLI_DBMS_NAME:
+ return new GetInfoValue("Apache Hive");
+ case CLI_DBMS_VER:
+ return new GetInfoValue("0.10.0");
+ case CLI_MAX_COLUMN_NAME_LEN:
+ return new GetInfoValue(128);
+ case CLI_MAX_SCHEMA_NAME_LEN:
+ return new GetInfoValue(128);
+ case CLI_MAX_TABLE_NAME_LEN:
+ return new GetInfoValue(128);
+ case CLI_TXN_CAPABLE:
+ default:
+ throw new HiveSQLException("Unrecognized GetInfoType value: " + getInfoType.toString());
+ }
+ } finally {
+ release();
+ }
+ }
+
+ public OperationHandle executeStatement(String statement, Map<String, String> confOverlay)
+ throws HiveSQLException {
+ acquire();
+ try {
+ ExecuteStatementOperation operation = getOperationManager()
+ .newExecuteStatementOperation(getSession(), statement, confOverlay);
+ operation.run();
+ return operation.getHandle();
+ } finally {
+ release();
+ }
+ }
+
+ public OperationHandle getTypeInfo()
+ throws HiveSQLException {
+ acquire();
+ try {
+ GetTypeInfoOperation operation = getOperationManager().newGetTypeInfoOperation(getSession());
+ operation.run();
+ return operation.getHandle();
+ } finally {
+ release();
+ }
+ }
+
+ public OperationHandle getCatalogs()
+ throws HiveSQLException {
+ acquire();
+ try {
+ GetCatalogsOperation operation = getOperationManager().newGetCatalogsOperation(getSession());
+ operation.run();
+ return operation.getHandle();
+ } finally {
+ release();
+ }
+ }
+
+ public OperationHandle getSchemas(String catalogName, String schemaName)
+ throws HiveSQLException {
+ acquire();
+ try {
+ GetSchemasOperation operation =
+ getOperationManager().newGetSchemasOperation(getSession(), catalogName, schemaName);
+ operation.run();
+ return operation.getHandle();
+ } finally {
+ release();
+ }
+ }
+
+ public OperationHandle getTables(String catalogName, String schemaName, String tableName,
+ List<String> tableTypes)
+ throws HiveSQLException {
+ acquire();
+ try {
+ MetadataOperation operation =
+ getOperationManager().newGetTablesOperation(getSession(), catalogName, schemaName, tableName, tableTypes);
+ operation.run();
+ return operation.getHandle();
+ } finally {
+ release();
+ }
+ }
+
+ public OperationHandle getTableTypes()
+ throws HiveSQLException {
+ acquire();
+ try {
+ GetTableTypesOperation operation = getOperationManager().newGetTableTypesOperation(getSession());
+ operation.run();
+ return operation.getHandle();
+ } finally {
+ release();
+ }
+ }
+
+ public OperationHandle getColumns(String catalogName, String schemaName,
+ String tableName, String columnName) throws HiveSQLException {
+ acquire();
+ try {
+ GetColumnsOperation operation = getOperationManager().newGetColumnsOperation(getSession(),
+ catalogName, schemaName, tableName, columnName);
+ operation.run();
+ return operation.getHandle();
+ } finally {
+ release();
+ }
+ }
+
+ public OperationHandle getFunctions(String catalogName, String schemaName, String functionName)
+ throws HiveSQLException {
+ acquire();
+ try {
+ GetFunctionsOperation operation = getOperationManager()
+ .newGetFunctionsOperation(getSession(), catalogName, schemaName, functionName);
+ operation.run();
+ return operation.getHandle();
+ } finally {
+ release();
+ }
+ }
+
+ public void close() throws HiveSQLException {
+ try {
+ acquire();
+ /**
+ * For metadata operations like getTables(), getColumns() etc,
+ * the session allocates a private metastore handler which should be
+ * closed at the end of the session
+ */
+ if (metastoreClient != null) {
+ metastoreClient.close();
+ }
+ } finally {
+ release();
+ }
+ }
+
+ public SessionState getSessionState() {
+ return sessionState;
+ }
+
+ public String getUserName() {
+ return username;
+ }
+ public void setUserName(String userName) {
+ this.username = userName;
+ }
+
+ @Override
+ public void cancelOperation(OperationHandle opHandle) throws HiveSQLException {
+ acquire();
+ try {
+ sessionManager.getOperationManager().cancelOperation(opHandle);
+ } finally {
+ release();
+ }
+ }
+
+ @Override
+ public void closeOperation(OperationHandle opHandle) throws HiveSQLException {
+ acquire();
+ try {
+ sessionManager.getOperationManager().closeOperation(opHandle);
+ } finally {
+ release();
+ }
+ }
+
+ @Override
+ public TableSchema getResultSetMetadata(OperationHandle opHandle) throws HiveSQLException {
+ acquire();
+ try {
+ return sessionManager.getOperationManager().getOperationResultSetSchema(opHandle);
+ } finally {
+ release();
+ }
+ }
+
+ @Override
+ public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, long maxRows)
+ throws HiveSQLException {
+ acquire();
+ try {
+ return sessionManager.getOperationManager()
+ .getOperationNextRowSet(opHandle, orientation, maxRows);
+ } finally {
+ release();
+ }
+ }
+
+ @Override
+ public RowSet fetchResults(OperationHandle opHandle) throws HiveSQLException {
+ acquire();
+ try {
+ return sessionManager.getOperationManager().getOperationNextRowSet(opHandle);
+ } finally {
+ release();
+ }
+ }
+
+ protected HiveSession getSession() {
+ return this;
+ }
+}
Added: hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java?rev=1455659&view=auto
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java (added)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java Tue Mar 12 18:22:00 2013
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.cli.session;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hive.service.cli.HiveSQLException;
+
+/**
+ *
+ * HiveSessionImplwithUGI.
+ * HiveSession with connecting user's UGI and delegation token if required
+ */
+public class HiveSessionImplwithUGI extends HiveSessionImpl {
+ public static final String HS2TOKEN = "HiveServer2ImpersonationToken";
+
+ private UserGroupInformation sessionUgi = null;
+ private String delegationTokenStr = null;
+ private Hive sessionHive = null;
+ private HiveSession proxySession = null;
+
+ public HiveSessionImplwithUGI(String username, String password, Map<String, String> sessionConf,
+ String delegationToken) throws HiveSQLException {
+ super(username, password, sessionConf);
+ setSessionUGI(username);
+ setUserPath(username);
+ setDelegationToken(delegationToken);
+ }
+
+ // setup appropriate UGI for the session
+ public void setSessionUGI(String owner) throws HiveSQLException {
+ if (owner == null) {
+ throw new HiveSQLException("No username provided for impersonation");
+ }
+ if (ShimLoader.getHadoopShims().isSecurityEnabled()) {
+ try {
+ sessionUgi = ShimLoader.getHadoopShims().createProxyUser(owner);
+ } catch (IOException e) {
+ throw new HiveSQLException("Couldn't setup proxy user", e);
+ }
+ } else {
+ sessionUgi = ShimLoader.getHadoopShims().createRemoteUser(owner, null);
+ }
+ }
+
+ public UserGroupInformation getSessionUgi() {
+ return this.sessionUgi;
+ }
+
+ public String getDelegationToken () {
+ return this.delegationTokenStr;
+ }
+
+ @Override
+ protected synchronized void acquire() throws HiveSQLException {
+ super.acquire();
+ // if we have a metastore connection with impersonation, then set it first
+ if (sessionHive != null) {
+ Hive.set(sessionHive);
+ }
+ }
+
+ /**
+ * close the file systems for the session
+ * cancel the session's delegation token and close the metastore connection
+ */
+ @Override
+ public void close() throws HiveSQLException {
+ try {
+ acquire();
+ ShimLoader.getHadoopShims().closeAllForUGI(sessionUgi);
+ cancelDelegationToken();
+ } finally {
+ release();
+ super.close();
+ }
+ }
+
+ /**
+ * Enable delegation token for the session
+ * save the token string and set the token.signature in hive conf. The metastore client uses
+ * this token.signature to determine where to use kerberos or delegation token
+ * @throws HiveException
+ * @throws IOException
+ */
+ private void setDelegationToken(String delegationTokenStr) throws HiveSQLException {
+ this.delegationTokenStr = delegationTokenStr;
+ if (delegationTokenStr != null) {
+ getHiveConf().set("hive.metastore.token.signature", HS2TOKEN);
+ try {
+ ShimLoader.getHadoopShims().setTokenStr(sessionUgi, delegationTokenStr, HS2TOKEN);
+ } catch (IOException e) {
+ throw new HiveSQLException("Couldn't setup delegation token in the ugi", e);
+ }
+ // create a new metastore connection using the delegation token
+ Hive.set(null);
+ try {
+ sessionHive = Hive.get(getHiveConf());
+ } catch (HiveException e) {
+ throw new HiveSQLException("Failed to setup metastore connection", e);
+ }
+ }
+ }
+
+ // If the session has a delegation token obtained from the metastore, then cancel it
+ private void cancelDelegationToken() throws HiveSQLException {
+ if (delegationTokenStr != null) {
+ try {
+ Hive.get(getHiveConf()).cancelDelegationToken(delegationTokenStr);
+ } catch (HiveException e) {
+ throw new HiveSQLException("Couldn't cancel delegation token", e);
+ }
+ // close the metastore connection created with this delegation token
+ Hive.closeCurrent();
+ }
+ }
+
+ // Append the user name to temp/scratch directory path for each impersonated user
+ private void setUserPath(String userName) {
+ for (HiveConf.ConfVars var: HiveConf.userVars) {
+ String userVar = getHiveConf().getVar(var);
+ if (userVar != null) {
+ // If there's a path separator at end then remove it
+ if (userVar.endsWith(File.separator)) {
+ userVar = userVar.substring(0, userVar.length()-2);
+ }
+ getHiveConf().setVar(var, userVar + "-" + userName);
+ }
+ }
+ }
+
+ @Override
+ protected HiveSession getSession() {
+ assert proxySession != null;
+
+ return proxySession;
+ }
+
+ public void setProxySession(HiveSession proxySession) {
+ this.proxySession = proxySession;
+ }
+
+
+}
Added: hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionProxy.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionProxy.java?rev=1455659&view=auto
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionProxy.java (added)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionProxy.java Tue Mar 12 18:22:00 2013
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.cli.session;
+
+/**
+ * Proxy wrapper on HiveSession to execute operations
+ * by impersonating given user
+ */
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
+
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hive.service.cli.HiveSQLException;
+
+public class HiveSessionProxy implements InvocationHandler {
+ private final HiveSession base;
+ private final UserGroupInformation ugi;
+
+ public HiveSessionProxy(HiveSession hiveSession, UserGroupInformation ugi) {
+ this.base = hiveSession;
+ this.ugi = ugi;
+ }
+
+ public static HiveSession getProxy(HiveSession hiveSession, UserGroupInformation ugi)
+ throws IllegalArgumentException, HiveSQLException {
+ return (HiveSession)Proxy.newProxyInstance(HiveSession.class.getClassLoader(),
+ new Class<?>[] {HiveSession.class},
+ new HiveSessionProxy(hiveSession, ugi));
+ }
+
+ @Override
+ public Object invoke(Object arg0, final Method method, final Object[] args)
+ throws Throwable {
+ try {
+ return ShimLoader.getHadoopShims().doAs(ugi,
+ new PrivilegedExceptionAction<Object> () {
+ @Override
+ public Object run() throws HiveSQLException {
+ try {
+ return method.invoke(base, args);
+ } catch (InvocationTargetException e) {
+ if (e.getCause() instanceof HiveSQLException) {
+ throw (HiveSQLException)e.getCause();
+ } else {
+ throw new RuntimeException(e.getCause());
+ }
+ } catch (IllegalArgumentException e) {
+ throw new RuntimeException(e);
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ } catch (UndeclaredThrowableException e) {
+ Throwable innerException = e.getCause();
+ if (innerException instanceof PrivilegedActionException) {
+ throw innerException.getCause();
+ } else {
+ throw e.getCause();
+ }
+ }
+ }
+
+}
+
Added: hive/trunk/service/src/java/org/apache/hive/service/cli/session/SessionManager.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/session/SessionManager.java?rev=1455659&view=auto
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/session/SessionManager.java (added)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/session/SessionManager.java Tue Mar 12 18:22:00 2013
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.cli.session;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hive.service.CompositeService;
+import org.apache.hive.service.cli.HiveSQLException;
+import org.apache.hive.service.cli.SessionHandle;
+import org.apache.hive.service.cli.operation.OperationManager;
+
+/**
+ * SessionManager.
+ *
+ */
+public class SessionManager extends CompositeService {
+
+ private HiveConf hiveConf;
+ private final Map<SessionHandle, HiveSession> handleToSession = new HashMap<SessionHandle, HiveSession>();
+ private OperationManager operationManager = new OperationManager();
+ private static final Object sessionMapLock = new Object();
+
+ public SessionManager() {
+ super("SessionManager");
+ }
+
+ @Override
+ public synchronized void init(HiveConf hiveConf) {
+ this.hiveConf = hiveConf;
+
+ operationManager = new OperationManager();
+ addService(operationManager);
+
+ super.init(hiveConf);
+ }
+
+ @Override
+ public synchronized void start() {
+ super.start();
+ // TODO
+ }
+
+ @Override
+ public synchronized void stop() {
+ // TODO
+ super.stop();
+ }
+
+
+ public SessionHandle openSession(String username, String password, Map<String, String> sessionConf)
+ throws HiveSQLException {
+ return openSession(username, password, sessionConf, false, null);
+ }
+
+ public SessionHandle openSession(String username, String password, Map<String, String> sessionConf,
+ boolean withImpersonation, String delegationToken) throws HiveSQLException {
+ HiveSession session;
+ if (username == null) {
+ username = threadLocalUserName.get();
+ }
+
+ if (withImpersonation) {
+ HiveSessionImplwithUGI hiveSessionUgi = new HiveSessionImplwithUGI(username, password, sessionConf,
+ delegationToken);
+ session = (HiveSession)HiveSessionProxy.getProxy(hiveSessionUgi, hiveSessionUgi.getSessionUgi());
+ hiveSessionUgi.setProxySession(session);
+ } else {
+ session = new HiveSessionImpl(username, password, sessionConf);
+ }
+ session.setSessionManager(this);
+ session.setOperationManager(operationManager);
+ synchronized(sessionMapLock) {
+ handleToSession.put(session.getSessionHandle(), session);
+ }
+ return session.getSessionHandle();
+ }
+
+ public void closeSession(SessionHandle sessionHandle) throws HiveSQLException {
+ HiveSession session;
+ synchronized(sessionMapLock) {
+ session = handleToSession.remove(sessionHandle);
+ }
+ if (session == null) {
+ throw new HiveSQLException("Session does not exist!");
+ }
+ session.close();
+ }
+
+
+ public HiveSession getSession(SessionHandle sessionHandle) throws HiveSQLException {
+ HiveSession session;
+ synchronized(sessionMapLock) {
+ session = handleToSession.get(sessionHandle);
+ }
+ if (session == null) {
+ throw new HiveSQLException("Invalid SessionHandle: " + sessionHandle);
+ }
+ return session;
+ }
+
+ public OperationManager getOperationManager() {
+ return operationManager;
+ }
+
+ private static ThreadLocal<String> threadLocalIpAddress = new ThreadLocal<String>() {
+ @Override
+ protected synchronized String initialValue() {
+ return null;
+ }
+ };
+
+ public static void setIpAddress(String ipAddress) {
+ threadLocalIpAddress.set(ipAddress);
+ }
+
+ private void clearIpAddress() {
+ threadLocalIpAddress.remove();
+ }
+
+ private static ThreadLocal<String> threadLocalUserName = new ThreadLocal<String>(){
+ @Override
+ protected synchronized String initialValue() {
+ return null;
+ }
+ };
+
+ public static void setUserName(String userName) {
+ threadLocalUserName.set(userName);
+ }
+
+ private void clearUserName() {
+ threadLocalUserName.remove();
+ }
+
+}
Added: hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/EmbeddedThriftCLIService.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/EmbeddedThriftCLIService.java?rev=1455659&view=auto
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/EmbeddedThriftCLIService.java (added)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/EmbeddedThriftCLIService.java Tue Mar 12 18:22:00 2013
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.cli.thrift;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hive.service.cli.CLIService;
+
+
+/**
+ * EmbeddedThriftCLIService.
+ *
+ */
+public class EmbeddedThriftCLIService extends ThriftCLIService {
+
+ public EmbeddedThriftCLIService() {
+ super(new CLIService());
+ isEmbedded = true;
+ cliService.init(new HiveConf());
+ cliService.start();
+ }
+}
Added: hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java?rev=1455659&view=auto
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java (added)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java Tue Mar 12 18:22:00 2013
@@ -0,0 +1,423 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.cli.thrift;
+
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hive.service.AbstractService;
+import org.apache.hive.service.auth.HiveAuthFactory;
+import org.apache.hive.service.cli.FetchOrientation;
+import org.apache.hive.service.cli.GetInfoType;
+import org.apache.hive.service.cli.GetInfoValue;
+import org.apache.hive.service.cli.HiveSQLException;
+import org.apache.hive.service.cli.OperationHandle;
+import org.apache.hive.service.cli.OperationState;
+import org.apache.hive.service.cli.RowSet;
+import org.apache.hive.service.cli.CLIService;
+import org.apache.hive.service.cli.SessionHandle;
+import org.apache.hive.service.cli.TableSchema;
+import org.apache.thrift.TException;
+import org.apache.thrift.TProcessorFactory;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TTransportFactory;
+
+
+/**
+ * CLIService.
+ *
+ */
+public class ThriftCLIService extends AbstractService implements TCLIService.Iface, Runnable {
+
+ public static final Log LOG = LogFactory.getLog(ThriftCLIService.class.getName());
+
+
+ protected CLIService cliService;
+ private static final TStatus OK_STATUS = new TStatus(TStatusCode.SUCCESS_STATUS);
+ private static final TStatus ERROR_STATUS = new TStatus(TStatusCode.ERROR_STATUS);
+
+ private static HiveAuthFactory hiveAuthFactory;
+
+ private int portNum;
+ private InetSocketAddress serverAddress;
+ private TServer server;
+
+ private boolean isStarted = false;
+ protected boolean isEmbedded = false;
+
+ private HiveConf hiveConf;
+
+ private int minWorkerThreads;
+ private int maxWorkerThreads;
+
+
+
+ public ThriftCLIService(CLIService cliService) {
+ super("ThriftCLIService");
+ this.cliService = cliService;
+ }
+
+ @Override
+ public synchronized void init(HiveConf hiveConf) {
+ this.hiveConf = hiveConf;
+ super.init(hiveConf);
+ }
+
+ @Override
+ public synchronized void start() {
+ super.start();
+ if (!isStarted && !isEmbedded) {
+ new Thread(this).start();
+ isStarted = true;
+ }
+ }
+
+ @Override
+ public synchronized void stop() {
+ if (isStarted && !isEmbedded) {
+ server.stop();
+ isStarted = false;
+ }
+ super.stop();
+ }
+
+
+ @Override
+ public TOpenSessionResp OpenSession(TOpenSessionReq req) throws TException {
+ TOpenSessionResp resp = new TOpenSessionResp();
+ try {
+ String userName;
+ if (hiveAuthFactory != null
+ && hiveAuthFactory.getRemoteUser() != null) {
+ userName = hiveAuthFactory.getRemoteUser();
+ } else {
+ userName = req.getUsername();
+ }
+ SessionHandle sessionHandle = null;
+ if (cliService.getHiveConf().
+ getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_IMPERSONATION)) {
+ String delegationTokenStr = null;
+ try {
+ delegationTokenStr = cliService.getDelegationTokenFromMetaStore(userName);
+ } catch (UnsupportedOperationException e) {
+ // The delegation token is not applicable in the given deployment mode
+ }
+ sessionHandle = cliService.openSessionWithImpersonation(userName, req.getPassword(),
+ req.getConfiguration(), delegationTokenStr);
+ } else {
+ sessionHandle = cliService.openSession(userName, req.getPassword(),
+ req.getConfiguration());
+ }
+ resp.setSessionHandle(sessionHandle.toTSessionHandle());
+ // TODO: set real configuration map
+ resp.setConfiguration(new HashMap<String, String>());
+ resp.setStatus(OK_STATUS);
+ } catch (Exception e) {
+ e.printStackTrace();
+ resp.setStatus(HiveSQLException.toTStatus(e));
+ }
+ return resp;
+ }
+
+ @Override
+ public TCloseSessionResp CloseSession(TCloseSessionReq req) throws TException {
+ TCloseSessionResp resp = new TCloseSessionResp();
+ try {
+ SessionHandle sessionHandle = new SessionHandle(req.getSessionHandle());
+ cliService.closeSession(sessionHandle);
+ resp.setStatus(OK_STATUS);
+ } catch (Exception e) {
+ e.printStackTrace();
+ resp.setStatus(HiveSQLException.toTStatus(e));
+ }
+ return resp;
+ }
+
+ @Override
+ public TGetInfoResp GetInfo(TGetInfoReq req) throws TException {
+ TGetInfoResp resp = new TGetInfoResp();
+ try {
+ GetInfoValue getInfoValue =
+ cliService.getInfo(new SessionHandle(req.getSessionHandle()),
+ GetInfoType.getGetInfoType(req.getInfoType()));
+ resp.setInfoValue(getInfoValue.toTGetInfoValue());
+ resp.setStatus(OK_STATUS);
+ } catch (Exception e) {
+ e.printStackTrace();
+ resp.setStatus(HiveSQLException.toTStatus(e));
+ }
+ return resp;
+ }
+
+ @Override
+ public TExecuteStatementResp ExecuteStatement(TExecuteStatementReq req) throws TException {
+ TExecuteStatementResp resp = new TExecuteStatementResp();
+ try {
+ SessionHandle sessionHandle = new SessionHandle(req.getSessionHandle());
+ String statement = req.getStatement();
+ Map<String, String> confOverlay = req.getConfOverlay();
+ OperationHandle operationHandle =
+ cliService.executeStatement(sessionHandle, statement, confOverlay);
+ resp.setOperationHandle(operationHandle.toTOperationHandle());
+ resp.setStatus(OK_STATUS);
+ } catch (Exception e) {
+ e.printStackTrace();
+ resp.setStatus(HiveSQLException.toTStatus(e));
+ }
+ return resp;
+ }
+
+ @Override
+ public TGetTypeInfoResp GetTypeInfo(TGetTypeInfoReq req) throws TException {
+ TGetTypeInfoResp resp = new TGetTypeInfoResp();
+ try {
+ OperationHandle operationHandle = cliService.getTypeInfo(new SessionHandle(req.getSessionHandle()));
+ resp.setOperationHandle(operationHandle.toTOperationHandle());
+ resp.setStatus(OK_STATUS);
+ } catch (Exception e) {
+ e.printStackTrace();
+ resp.setStatus(HiveSQLException.toTStatus(e));
+ }
+ return resp;
+ }
+
+ @Override
+ public TGetCatalogsResp GetCatalogs(TGetCatalogsReq req) throws TException {
+ TGetCatalogsResp resp = new TGetCatalogsResp();
+ try {
+ OperationHandle opHandle = cliService.getCatalogs(new SessionHandle(req.getSessionHandle()));
+ resp.setOperationHandle(opHandle.toTOperationHandle());
+ resp.setStatus(OK_STATUS);
+ } catch (Exception e) {
+ e.printStackTrace();
+ resp.setStatus(HiveSQLException.toTStatus(e));
+ }
+ return resp;
+ }
+
+ @Override
+ public TGetSchemasResp GetSchemas(TGetSchemasReq req) throws TException {
+ TGetSchemasResp resp = new TGetSchemasResp();
+ try {
+ OperationHandle opHandle = cliService.getSchemas(
+ new SessionHandle(req.getSessionHandle()), req.getCatalogName(), req.getSchemaName());
+ resp.setOperationHandle(opHandle.toTOperationHandle());
+ resp.setStatus(OK_STATUS);
+ } catch (Exception e) {
+ e.printStackTrace();
+ resp.setStatus(HiveSQLException.toTStatus(e));
+ }
+ return resp;
+ }
+
+ @Override
+ public TGetTablesResp GetTables(TGetTablesReq req) throws TException {
+ TGetTablesResp resp = new TGetTablesResp();
+ try {
+ OperationHandle opHandle = cliService
+ .getTables(new SessionHandle(req.getSessionHandle()), req.getCatalogName(),
+ req.getSchemaName(), req.getTableName(), req.getTableTypes());
+ resp.setOperationHandle(opHandle.toTOperationHandle());
+ resp.setStatus(OK_STATUS);
+ } catch (Exception e) {
+ e.printStackTrace();
+ resp.setStatus(HiveSQLException.toTStatus(e));
+ }
+ return resp;
+ }
+
+ @Override
+ public TGetTableTypesResp GetTableTypes(TGetTableTypesReq req) throws TException {
+ TGetTableTypesResp resp = new TGetTableTypesResp();
+ try {
+ OperationHandle opHandle = cliService.getTableTypes(new SessionHandle(req.getSessionHandle()));
+ resp.setOperationHandle(opHandle.toTOperationHandle());
+ resp.setStatus(OK_STATUS);
+ } catch (Exception e) {
+ e.printStackTrace();
+ resp.setStatus(HiveSQLException.toTStatus(e));
+ }
+ return resp;
+ }
+
+ @Override
+ public TGetColumnsResp GetColumns(TGetColumnsReq req) throws TException {
+ TGetColumnsResp resp = new TGetColumnsResp();
+ try {
+ OperationHandle opHandle = cliService.getColumns(
+ new SessionHandle(req.getSessionHandle()),
+ req.getCatalogName(),
+ req.getSchemaName(),
+ req.getTableName(),
+ req.getColumnName());
+ resp.setOperationHandle(opHandle.toTOperationHandle());
+ resp.setStatus(OK_STATUS);
+ } catch (Exception e) {
+ e.printStackTrace();
+ resp.setStatus(HiveSQLException.toTStatus(e));
+ }
+ return resp;
+ }
+
+ @Override
+ public TGetFunctionsResp GetFunctions(TGetFunctionsReq req) throws TException {
+ TGetFunctionsResp resp = new TGetFunctionsResp();
+ try {
+ OperationHandle opHandle = cliService.getFunctions(
+ new SessionHandle(req.getSessionHandle()), req.getCatalogName(),
+ req.getSchemaName(), req.getFunctionName());
+ resp.setOperationHandle(opHandle.toTOperationHandle());
+ resp.setStatus(OK_STATUS);
+ } catch (Exception e) {
+ e.printStackTrace();
+ resp.setStatus(HiveSQLException.toTStatus(e));
+ }
+ return resp;
+ }
+
+ @Override
+ public TGetOperationStatusResp GetOperationStatus(TGetOperationStatusReq req) throws TException {
+ TGetOperationStatusResp resp = new TGetOperationStatusResp();
+ try {
+ OperationState operationState = cliService.getOperationStatus(new OperationHandle(req.getOperationHandle()));
+ resp.setOperationState(operationState.toTOperationState());
+ resp.setStatus(OK_STATUS);
+ } catch (Exception e) {
+ e.printStackTrace();
+ resp.setStatus(HiveSQLException.toTStatus(e));
+ }
+ return resp;
+ }
+
+ @Override
+ public TCancelOperationResp CancelOperation(TCancelOperationReq req) throws TException {
+ TCancelOperationResp resp = new TCancelOperationResp();
+ try {
+ cliService.cancelOperation(new OperationHandle(req.getOperationHandle()));
+ resp.setStatus(OK_STATUS);
+ } catch (Exception e) {
+ e.printStackTrace();
+ resp.setStatus(HiveSQLException.toTStatus(e));
+ }
+ return resp;
+ }
+
+ @Override
+ public TCloseOperationResp CloseOperation(TCloseOperationReq req) throws TException {
+ TCloseOperationResp resp = new TCloseOperationResp();
+ try {
+ cliService.closeOperation(new OperationHandle(req.getOperationHandle()));
+ resp.setStatus(OK_STATUS);
+ } catch (Exception e) {
+ e.printStackTrace();
+ resp.setStatus(HiveSQLException.toTStatus(e));
+ }
+ return resp;
+ }
+
+ @Override
+ public TGetResultSetMetadataResp GetResultSetMetadata(TGetResultSetMetadataReq req)
+ throws TException {
+ TGetResultSetMetadataResp resp = new TGetResultSetMetadataResp();
+ try {
+ TableSchema schema = cliService.getResultSetMetadata(new OperationHandle(req.getOperationHandle()));
+ resp.setSchema(schema.toTTableSchema());
+ resp.setStatus(OK_STATUS);
+ } catch (Exception e) {
+ e.printStackTrace();
+ resp.setStatus(HiveSQLException.toTStatus(e));
+ }
+ return resp;
+ }
+
+ @Override
+ public TFetchResultsResp FetchResults(TFetchResultsReq req) throws TException {
+ TFetchResultsResp resp = new TFetchResultsResp();
+ try {
+ RowSet rowSet = cliService.fetchResults(
+ new OperationHandle(req.getOperationHandle()),
+ FetchOrientation.getFetchOrientation(req.getOrientation()),
+ req.getMaxRows());
+ resp.setResults(rowSet.toTRowSet());
+ resp.setHasMoreRows(false);
+ resp.setStatus(OK_STATUS);
+ } catch (Exception e) {
+ e.printStackTrace();
+ resp.setStatus(HiveSQLException.toTStatus(e));
+ }
+ return resp;
+ }
+
+
+ @Override
+ public void run() {
+ try {
+ hiveAuthFactory = new HiveAuthFactory();
+ TTransportFactory transportFactory = hiveAuthFactory.getAuthTransFactory();
+ TProcessorFactory processorFactory = hiveAuthFactory.getAuthProcFactory(this);
+
+ String portString = System.getenv("HIVE_SERVER2_THRIFT_PORT");
+ if (portString != null) {
+ portNum = Integer.valueOf(portString);
+ } else {
+ portNum = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_PORT);
+ }
+
+ String hiveHost = System.getenv("HIVE_SERVER2_THRIFT_BIND_HOST");
+ if (hiveHost == null) {
+ hiveHost = hiveConf.getVar(ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST);
+ }
+
+ if (hiveHost != null && !hiveHost.isEmpty()) {
+ serverAddress = new InetSocketAddress(hiveHost, portNum);
+ } else {
+ serverAddress = new InetSocketAddress(portNum);
+ }
+
+
+ minWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MIN_WORKER_THREADS);
+ maxWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MAX_WORKER_THREADS);
+
+
+ TThreadPoolServer.Args sargs = new TThreadPoolServer.Args(new TServerSocket(serverAddress))
+ .processorFactory(processorFactory)
+ .transportFactory(transportFactory)
+ .protocolFactory(new TBinaryProtocol.Factory())
+ .minWorkerThreads(minWorkerThreads)
+ .maxWorkerThreads(maxWorkerThreads);
+
+ server = new TThreadPoolServer(sargs);
+
+ LOG.info("ThriftCLIService listening on " + serverAddress);
+
+ server.serve();
+ } catch (Throwable t) {
+ t.printStackTrace();
+ }
+ }
+
+}