You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/03/12 19:22:03 UTC

svn commit: r1455659 [10/11] - in /hive/trunk: ./ ant/src/org/apache/hadoop/hive/ant/ bin/ bin/ext/ cli/ common/ common/src/gen/ common/src/gen/org/ common/src/gen/org/apache/ common/src/gen/org/apache/hive/ common/src/gen/org/apache/hive/common/ commo...

Added: hive/trunk/service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java?rev=1455659&view=auto
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java (added)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java Tue Mar 12 18:22:00 2013
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.cli.operation;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hive.metastore.api.Schema;
+import org.apache.hadoop.hive.ql.processors.CommandProcessor;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hive.service.cli.FetchOrientation;
+import org.apache.hive.service.cli.HiveSQLException;
+import org.apache.hive.service.cli.OperationState;
+import org.apache.hive.service.cli.RowSet;
+import org.apache.hive.service.cli.TableSchema;
+import org.apache.hive.service.cli.session.HiveSession;
+
+/**
+ * HiveCommandOperation.
+ *
+ */
+public abstract class HiveCommandOperation extends ExecuteStatementOperation {
+  private CommandProcessorResponse response;
+  private CommandProcessor commandProcessor;
+  private TableSchema resultSchema = null;
+
+  /**
+   * For processors other than Hive queries (Driver), they output to session.out (a temp file)
+   * first and the fetchOne/fetchN/fetchAll functions get the output from pipeIn.
+   */
+  private BufferedReader resultReader;
+
+
+  protected HiveCommandOperation(HiveSession parentSession, String statement, Map<String, String> confOverlay) {
+    super(parentSession, statement, confOverlay);
+    setupSessionIO(parentSession.getSessionState());
+  }
+
+  private void setupSessionIO(SessionState sessionState) {
+    try {
+      LOG.info("Putting temp output to file " + sessionState.getTmpOutputFile().toString());
+      sessionState.in = null; // hive server's session input stream is not used
+      // open a per-session file in auto-flush mode for writing temp results
+      sessionState.out = new PrintStream(new FileOutputStream(sessionState.getTmpOutputFile()), true, "UTF-8");
+      // TODO: for hadoop jobs, progress is printed out to session.err,
+      // we should find a way to feed back job progress to client
+      sessionState.err = new PrintStream(System.err, true, "UTF-8");
+    } catch (IOException e) {
+      LOG.error("Error in creating temp output file ", e);
+      try {
+        sessionState.in = null;
+        sessionState.out = new PrintStream(System.out, true, "UTF-8");
+        sessionState.err = new PrintStream(System.err, true, "UTF-8");
+      } catch (UnsupportedEncodingException ee) {
+        ee.printStackTrace();
+        sessionState.out = null;
+        sessionState.err = null;
+      }
+    }
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.hive.service.cli.operation.Operation#run()
+   */
+  @Override
+  public void run() throws HiveSQLException {
+    setState(OperationState.RUNNING);
+    try {
+      String command = getStatement().trim();
+      String[] tokens = statement.split("\\s");
+      String commandArgs = command.substring(tokens[0].length()).trim();
+
+      response = getCommandProcessor().run(commandArgs);
+      int returnCode = response.getResponseCode();
+      String sqlState = response.getSQLState();
+      String errorMessage = response.getErrorMessage();
+      Schema schema = response.getSchema();
+      if (schema != null) {
+        setHasResultSet(true);
+        resultSchema = new TableSchema(schema);
+      } else {
+        setHasResultSet(false);
+        resultSchema = new TableSchema();
+      }
+    } catch (Exception e) {
+      setState(OperationState.ERROR);
+      throw new HiveSQLException("Error running query: " + e.toString());
+    }
+    setState(OperationState.FINISHED);
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.hive.service.cli.operation.Operation#close()
+   */
+  @Override
+  public void close() throws HiveSQLException {
+    setState(OperationState.CLOSED);
+    cleanTmpFile();
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.hive.service.cli.operation.Operation#getResultSetSchema()
+   */
+  @Override
+  public TableSchema getResultSetSchema() throws HiveSQLException {
+    return resultSchema;
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.hive.service.cli.operation.Operation#getNextRowSet(org.apache.hive.service.cli.FetchOrientation, long)
+   */
+  @Override
+  public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException {
+    List<String> rows = readResults((int) maxRows);
+    RowSet rowSet = new RowSet();
+
+    for (String row : rows) {
+      rowSet.addRow(resultSchema, new String[] {row});
+    }
+    return rowSet;
+  }
+
+  /**
+   * Reads the temporary results for non-Hive (non-Driver) commands to the
+   * resulting List of strings.
+   * @param results list of strings containing the results
+   * @param nLines number of lines read at once. If it is <= 0, then read all lines.
+   */
+  private List<String> readResults(int nLines) throws HiveSQLException {
+    if (resultReader == null) {
+      SessionState sessionState = getParentSession().getSessionState();
+      File tmp = sessionState.getTmpOutputFile();
+      try {
+        resultReader = new BufferedReader(new FileReader(tmp));
+      } catch (FileNotFoundException e) {
+        LOG.error("File " + tmp + " not found. ", e);
+        throw new HiveSQLException(e);
+      }
+    }
+
+    List<String> results = new ArrayList<String>();
+
+    for (int i = 0; i < nLines || nLines <= 0; ++i) {
+      try {
+        String line = resultReader.readLine();
+        if (line == null) {
+          // reached the end of the result file
+          break;
+        } else {
+          results.add(line);
+        }
+      } catch (IOException e) {
+        LOG.error("Reading temp results encountered an exception: ", e);
+        throw new HiveSQLException(e);
+      }
+    }
+    return results;
+  }
+
+  private void cleanTmpFile() {
+    if (resultReader != null) {
+      SessionState sessionState = getParentSession().getSessionState();
+      File tmp = sessionState.getTmpOutputFile();
+      tmp.delete();
+      resultReader = null;
+    }
+  }
+
+  protected CommandProcessor getCommandProcessor() {
+    return commandProcessor;
+  }
+
+  protected void setCommandProcessor(CommandProcessor commandProcessor) {
+    this.commandProcessor = commandProcessor;
+  }
+}

Added: hive/trunk/service/src/java/org/apache/hive/service/cli/operation/MetadataOperation.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/operation/MetadataOperation.java?rev=1455659&view=auto
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/operation/MetadataOperation.java (added)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/operation/MetadataOperation.java Tue Mar 12 18:22:00 2013
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.cli.operation;
+
+import org.apache.hive.service.cli.HiveSQLException;
+import org.apache.hive.service.cli.OperationState;
+import org.apache.hive.service.cli.OperationType;
+import org.apache.hive.service.cli.TableSchema;
+import org.apache.hive.service.cli.session.HiveSession;
+
+/**
+ * MetadataOperation.
+ *
+ */
+public abstract class MetadataOperation extends Operation {
+
+  protected static final String DEFAULT_HIVE_CATALOG = "";
+  protected static TableSchema RESULT_SET_SCHEMA;
+  private static final char SEARCH_STRING_ESCAPE = '\\';
+
+  protected MetadataOperation(HiveSession parentSession, OperationType opType) {
+    super(parentSession, opType);
+    setHasResultSet(true);
+  }
+
+
+  /* (non-Javadoc)
+   * @see org.apache.hive.service.cli.Operation#close()
+   */
+  @Override
+  public void close() throws HiveSQLException {
+    setState(OperationState.CLOSED);
+  }
+
+  /**
+   * Convert wildchars and escape sequence from JDBC format to datanucleous/regex
+   */
+  protected String convertIdentifierPattern(final String pattern, boolean datanucleusFormat) {
+    if (pattern == null) {
+      return convertPattern("%", true);
+    } else {
+      return convertPattern(pattern, datanucleusFormat);
+    }
+  }
+
+  /**
+   * Convert wildchars and escape sequence of schema pattern from JDBC format to datanucleous/regex
+   * The schema pattern treats empty string also as wildchar
+   */
+  protected String convertSchemaPattern(final String pattern) {
+    if ((pattern == null) || pattern.isEmpty()) {
+      return convertPattern("%", true);
+    } else {
+      return convertPattern(pattern, true);
+    }
+  }
+
+  /**
+   * Convert a pattern containing JDBC catalog search wildcards into
+   * Java regex patterns.
+   *
+   * @param pattern input which may contain '%' or '_' wildcard characters, or
+   * these characters escaped using {@link #getSearchStringEscape()}.
+   * @return replace %/_ with regex search characters, also handle escaped
+   * characters.
+   *
+   * The datanucleus module expects the wildchar as '*'. The columns search on the
+   * other hand is done locally inside the hive code and that requires the regex wildchar
+   * format '.*'  This is driven by the datanucleusFormat flag.
+   */
+  private String convertPattern(final String pattern, boolean datanucleusFormat) {
+      String wStr;
+      if (datanucleusFormat) {
+        wStr = "*";
+      } else {
+        wStr = ".*";
+      }
+      return pattern
+          .replaceAll("([^\\\\])%", "$1" + wStr).replaceAll("\\\\%", "%").replaceAll("^%", wStr)
+          .replaceAll("([^\\\\])_", "$1.").replaceAll("\\\\_", "_").replaceAll("^_", ".");
+  }
+
+}

Added: hive/trunk/service/src/java/org/apache/hive/service/cli/operation/Operation.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/operation/Operation.java?rev=1455659&view=auto
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/operation/Operation.java (added)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/operation/Operation.java Tue Mar 12 18:22:00 2013
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.service.cli.operation;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hive.service.cli.FetchOrientation;
+import org.apache.hive.service.cli.HiveSQLException;
+import org.apache.hive.service.cli.OperationHandle;
+import org.apache.hive.service.cli.OperationState;
+import org.apache.hive.service.cli.OperationType;
+import org.apache.hive.service.cli.RowSet;
+import org.apache.hive.service.cli.TableSchema;
+import org.apache.hive.service.cli.session.HiveSession;
+
+
+
+public abstract class Operation {
+  private final HiveSession parentSession;
+  private OperationState state = OperationState.INITIALIZED;
+  private final OperationHandle opHandle;
+  private HiveConf configuration;
+  public static final Log LOG = LogFactory.getLog(Operation.class.getName());
+  public static final long DEFAULT_FETCH_MAX_ROWS = 100;
+  protected boolean hasResultSet;
+
+  protected Operation(HiveSession parentSession, OperationType opType) {
+    super();
+    this.parentSession = parentSession;
+    opHandle = new OperationHandle(opType);
+  }
+
+  public void setConfiguration(HiveConf configuration) {
+    this.configuration = new HiveConf(configuration);
+  }
+
+  public HiveConf getConfiguration() {
+    return new HiveConf(configuration);
+  }
+
+  public HiveSession getParentSession() {
+    return parentSession;
+  }
+
+  public OperationHandle getHandle() {
+    return opHandle;
+  }
+
+  public OperationType getType() {
+    return opHandle.getOperationType();
+  }
+
+  public OperationState getState() {
+    return state;
+  }
+
+  public boolean hasResultSet() {
+    return hasResultSet;
+  }
+
+  protected void setHasResultSet(boolean hasResultSet) {
+    this.hasResultSet = hasResultSet;
+    opHandle.setHasResultSet(hasResultSet);
+  }
+
+  protected final OperationState setState(OperationState newState) throws HiveSQLException {
+    state.validateTransition(newState);
+    this.state = newState;
+    return this.state;
+  }
+
+  protected final void assertState(OperationState state) throws HiveSQLException {
+    if (this.state != state) {
+      throw new HiveSQLException("Expected state " + state + ", but found " + this.state);
+    }
+  }
+
+  public boolean isRunning() {
+    return OperationState.RUNNING.equals(getState());
+  }
+
+  public boolean isFinished() {
+    return OperationState.FINISHED.equals(getState());
+  }
+
+  public boolean isCanceled() {
+    return OperationState.CANCELED.equals(getState());
+  }
+
+  public boolean isFailed() {
+    return OperationState.ERROR.equals(getState());
+  }
+
+  public abstract void run() throws HiveSQLException;
+
+  // TODO: make this abstract and implement in subclasses.
+  public void cancel() throws HiveSQLException {
+    setState(OperationState.CANCELED);
+    throw new UnsupportedOperationException("SQLOperation.cancel()");
+  }
+
+  public abstract void close() throws HiveSQLException;
+
+  public abstract TableSchema getResultSetSchema() throws HiveSQLException;
+
+  public abstract RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException;
+
+  public RowSet getNextRowSet() throws HiveSQLException {
+    return getNextRowSet(FetchOrientation.FETCH_NEXT, DEFAULT_FETCH_MAX_ROWS);
+  }
+}

Added: hive/trunk/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java?rev=1455659&view=auto
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java (added)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java Tue Mar 12 18:22:00 2013
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.cli.operation;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hive.service.AbstractService;
+import org.apache.hive.service.cli.FetchOrientation;
+import org.apache.hive.service.cli.HiveSQLException;
+import org.apache.hive.service.cli.OperationHandle;
+import org.apache.hive.service.cli.OperationState;
+import org.apache.hive.service.cli.RowSet;
+import org.apache.hive.service.cli.TableSchema;
+import org.apache.hive.service.cli.session.HiveSession;
+
+/**
+ * OperationManager.
+ *
+ */
+public class OperationManager extends AbstractService {
+
+  private HiveConf hiveConf;
+  private final Map<OperationHandle, Operation> handleToOperation =
+      new HashMap<OperationHandle, Operation>();
+
+  public OperationManager() {
+    super("OperationManager");
+  }
+
+  @Override
+  public synchronized void init(HiveConf hiveConf) {
+    this.hiveConf = hiveConf;
+
+    super.init(hiveConf);
+  }
+
+  @Override
+  public synchronized void start() {
+    super.start();
+    // TODO
+  }
+
+  @Override
+  public synchronized void stop() {
+    // TODO
+    super.stop();
+  }
+
+  public ExecuteStatementOperation newExecuteStatementOperation(HiveSession parentSession,
+      String statement, Map<String, String> confOverlay) {
+    ExecuteStatementOperation executeStatementOperation = ExecuteStatementOperation
+        .newExecuteStatementOperation(parentSession, statement, confOverlay);
+    addOperation(executeStatementOperation);
+    return executeStatementOperation;
+  }
+
+  public GetTypeInfoOperation newGetTypeInfoOperation(HiveSession parentSession) {
+    GetTypeInfoOperation operation = new GetTypeInfoOperation(parentSession);
+    addOperation(operation);
+    return operation;
+  }
+
+  public GetCatalogsOperation newGetCatalogsOperation(HiveSession parentSession) {
+    GetCatalogsOperation operation = new GetCatalogsOperation(parentSession);
+    addOperation(operation);
+    return operation;
+  }
+
+  public GetSchemasOperation newGetSchemasOperation(HiveSession parentSession,
+      String catalogName, String schemaName) {
+    GetSchemasOperation operation = new GetSchemasOperation(parentSession, catalogName, schemaName);
+    addOperation(operation);
+    return operation;
+  }
+
+  public MetadataOperation newGetTablesOperation(HiveSession parentSession,
+      String catalogName, String schemaName, String tableName,
+      List<String> tableTypes) {
+    MetadataOperation operation =
+        new GetTablesOperation(parentSession, catalogName, schemaName, tableName, tableTypes);
+    addOperation(operation);
+    return operation;
+  }
+
+  public GetTableTypesOperation newGetTableTypesOperation(HiveSession parentSession) {
+    GetTableTypesOperation operation = new GetTableTypesOperation(parentSession);
+    addOperation(operation);
+    return operation;
+  }
+
+  public GetColumnsOperation newGetColumnsOperation(HiveSession parentSession,
+      String catalogName, String schemaName, String tableName, String columnName) {
+    GetColumnsOperation operation = new GetColumnsOperation(parentSession,
+        catalogName, schemaName, tableName, columnName);
+    addOperation(operation);
+    return operation;
+  }
+
+  public GetFunctionsOperation newGetFunctionsOperation(HiveSession parentSession,
+      String catalogName, String schemaName, String functionName) {
+    GetFunctionsOperation operation = new GetFunctionsOperation(parentSession,
+        catalogName, schemaName, functionName);
+    addOperation(operation);
+    return operation;
+  }
+
+  public synchronized Operation getOperation(OperationHandle operationHandle) throws HiveSQLException {
+    Operation operation = handleToOperation.get(operationHandle);
+    if (operation == null) {
+      throw new HiveSQLException("Invalid OperationHandle: " + operationHandle);
+    }
+    return operation;
+  }
+
+  private synchronized void addOperation(Operation operation) {
+    handleToOperation.put(operation.getHandle(), operation);
+  }
+
+  private synchronized Operation removeOperation(OperationHandle opHandle) {
+    return handleToOperation.remove(opHandle);
+  }
+
+  public OperationState getOperationState(OperationHandle opHandle) throws HiveSQLException {
+    return getOperation(opHandle).getState();
+  }
+
+  public void cancelOperation(OperationHandle opHandle) throws HiveSQLException {
+    getOperation(opHandle).cancel();
+  }
+
+  public void closeOperation(OperationHandle opHandle) throws HiveSQLException {
+    Operation operation = removeOperation(opHandle);
+    if (operation == null) {
+      throw new HiveSQLException("Operation does not exist!");
+    }
+    operation.close();
+  }
+
+  public TableSchema getOperationResultSetSchema(OperationHandle opHandle)
+      throws HiveSQLException {
+    return getOperation(opHandle).getResultSetSchema();
+  }
+
+  public RowSet getOperationNextRowSet(OperationHandle opHandle) throws HiveSQLException {
+    return getOperation(opHandle).getNextRowSet();
+  }
+
+  public RowSet getOperationNextRowSet(OperationHandle opHandle,
+      FetchOrientation orientation, long maxRows)
+      throws HiveSQLException {
+    return getOperation(opHandle).getNextRowSet(orientation, maxRows);
+  }
+}

Added: hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java?rev=1455659&view=auto
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java (added)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java Tue Mar 12 18:22:00 2013
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.cli.operation;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Schema;
+import org.apache.hadoop.hive.ql.CommandNeedRetryException;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.parse.VariableSubstitution;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hive.service.cli.FetchOrientation;
+import org.apache.hive.service.cli.HiveSQLException;
+import org.apache.hive.service.cli.OperationState;
+import org.apache.hive.service.cli.RowSet;
+import org.apache.hive.service.cli.TableSchema;
+import org.apache.hive.service.cli.session.HiveSession;
+
+/**
+ * SQLOperation.
+ *
+ */
+public class SQLOperation extends ExecuteStatementOperation {
+
+  private Driver driver = null;
+  private CommandProcessorResponse response;
+  private TableSchema resultSchema = null;
+  private Schema mResultSchema = null;
+  private SerDe serde = null;
+
+
+  public SQLOperation(HiveSession parentSession, String statement, Map<String, String> confOverlay) {
+    // TODO: call setRemoteUser in ExecuteStatementOperation or higher.
+    super(parentSession, statement, confOverlay);
+  }
+
+
+  public void prepare() throws HiveSQLException {
+  }
+
+  @Override
+  public void run() throws HiveSQLException {
+    setState(OperationState.RUNNING);
+    String statement_trimmed = statement.trim();
+    String[] tokens = statement_trimmed.split("\\s");
+    String cmd_1 = statement_trimmed.substring(tokens[0].length()).trim();
+
+    int ret = 0;
+    String errorMessage = "";
+    String SQLState = null;
+
+    try {
+      driver = new Driver(getParentSession().getHiveConf());
+      // In Hive server mode, we are not able to retry in the FetchTask
+      // case, when calling fetch queries since execute() has returned.
+      // For now, we disable the test attempts.
+      driver.setTryCount(Integer.MAX_VALUE);
+
+      String subStatement = new VariableSubstitution().substitute(getParentSession().getHiveConf(), statement);
+
+      response = driver.run(subStatement);
+      if (0 != response.getResponseCode()) {
+        throw new HiveSQLException("Error while processing statement: "
+            + response.getErrorMessage(), response.getSQLState(), response.getResponseCode());
+      }
+
+      mResultSchema = driver.getSchema();
+      if (mResultSchema != null && mResultSchema.isSetFieldSchemas()) {
+        resultSchema = new TableSchema(mResultSchema);
+        setHasResultSet(true);
+      } else {
+        setHasResultSet(false);
+      }
+    } catch (HiveSQLException e) {
+      setState(OperationState.ERROR);
+      throw e;
+    } catch (Exception e) {
+      setState(OperationState.ERROR);
+      throw new HiveSQLException("Error running query: " + e.toString());
+    }
+    setState(OperationState.FINISHED);
+  }
+
+  @Override
+  public void cancel() throws HiveSQLException {
+    setState(OperationState.CANCELED);
+    if (driver != null) {
+      driver.close();
+      driver.destroy();
+    }
+
+    SessionState session = SessionState.get();
+    if (session.getTmpOutputFile() != null) {
+      session.getTmpOutputFile().delete();
+    }
+  }
+
+  @Override
+  public void close() throws HiveSQLException {
+    setState(OperationState.CLOSED);
+    if (driver != null) {
+      driver.close();
+      driver.destroy();
+    }
+
+    SessionState session = SessionState.get();
+    if (session.getTmpOutputFile() != null) {
+      session.getTmpOutputFile().delete();
+    }
+  }
+
+  @Override
+  public TableSchema getResultSetSchema() throws HiveSQLException {
+    assertState(OperationState.FINISHED);
+    if (resultSchema == null) {
+      resultSchema = new TableSchema(driver.getSchema());
+    }
+    return resultSchema;
+  }
+
+
+  @Override
+  public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException {
+    assertState(OperationState.FINISHED);
+    ArrayList<String> rows = new ArrayList<String>();
+    driver.setMaxRows((int)maxRows);
+
+    try {
+      driver.getResults(rows);
+
+      getSerDe();
+      StructObjectInspector soi = (StructObjectInspector) serde.getObjectInspector();
+      List<? extends StructField> fieldRefs = soi.getAllStructFieldRefs();
+      RowSet rowSet = new RowSet();
+
+      Object[] deserializedFields = new Object[fieldRefs.size()];
+      Object rowObj;
+      ObjectInspector fieldOI;
+
+      for (String rowString : rows) {
+        rowObj = serde.deserialize(new BytesWritable(rowString.getBytes()));
+        for (int i = 0; i < fieldRefs.size(); i++) {
+          StructField fieldRef = fieldRefs.get(i);
+          fieldOI = fieldRef.getFieldObjectInspector();
+          deserializedFields[i] = convertLazyToJava(soi.getStructFieldData(rowObj, fieldRef), fieldOI);
+        }
+        rowSet.addRow(resultSchema, deserializedFields);
+      }
+      return rowSet;
+    } catch (IOException e) {
+      throw new HiveSQLException(e);
+    } catch (CommandNeedRetryException e) {
+      throw new HiveSQLException(e);
+    } catch (Exception e) {
+      throw new HiveSQLException(e);
+    }
+  }
+
+  /**
+   * Convert a LazyObject to a standard Java object in compliance with JDBC 3.0 (see JDBC 3.0
+   * Specification, Table B-3: Mapping from JDBC Types to Java Object Types).
+   *
+   * This method is kept consistent with {@link HiveResultSetMetaData#hiveTypeToSqlType}.
+   */
+  private static Object convertLazyToJava(Object o, ObjectInspector oi) {
+    Object obj = ObjectInspectorUtils.copyToStandardObject(o, oi, ObjectInspectorCopyOption.JAVA);
+
+    // for now, expose non-primitive as a string
+    // TODO: expose non-primitive as a structured object while maintaining JDBC compliance
+    if (obj != null && oi.getCategory() != ObjectInspector.Category.PRIMITIVE) {
+      obj = obj.toString();
+    }
+
+    return obj;
+  }
+
+
+  private SerDe getSerDe() throws SQLException {
+    if (serde != null) {
+      return serde;
+    }
+    try {
+      List<FieldSchema> fieldSchemas = mResultSchema.getFieldSchemas();
+      List<String> columnNames = new ArrayList<String>();
+      List<String> columnTypes = new ArrayList<String>();
+      StringBuilder namesSb = new StringBuilder();
+      StringBuilder typesSb = new StringBuilder();
+
+      if (fieldSchemas != null && !fieldSchemas.isEmpty()) {
+        for (int pos = 0; pos < fieldSchemas.size(); pos++) {
+          if (pos != 0) {
+            namesSb.append(",");
+            typesSb.append(",");
+          }
+          columnNames.add(fieldSchemas.get(pos).getName());
+          columnTypes.add(fieldSchemas.get(pos).getType());
+          namesSb.append(fieldSchemas.get(pos).getName());
+          typesSb.append(fieldSchemas.get(pos).getType());
+        }
+      }
+      String names = namesSb.toString();
+      String types = typesSb.toString();
+
+      serde = new LazySimpleSerDe();
+      Properties props = new Properties();
+      if (names.length() > 0) {
+        LOG.debug("Column names: " + names);
+        props.setProperty(serdeConstants.LIST_COLUMNS, names);
+      }
+      if (types.length() > 0) {
+        LOG.debug("Column types: " + types);
+        props.setProperty(serdeConstants.LIST_COLUMN_TYPES, types);
+      }
+      serde.initialize(new HiveConf(), props);
+
+    } catch (Exception ex) {
+      ex.printStackTrace();
+      throw new SQLException("Could not create ResultSet: " + ex.getMessage(), ex);
+    }
+    return serde;
+  }
+
+}

Added: hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SetOperation.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SetOperation.java?rev=1455659&view=auto
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SetOperation.java (added)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SetOperation.java Tue Mar 12 18:22:00 2013
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.cli.operation;
+
+import java.util.Map;
+
+import org.apache.hadoop.hive.ql.processors.SetProcessor;
+import org.apache.hive.service.cli.session.HiveSession;
+
+/**
+ * HiveSetCommandOperation.
+ *
+ */
+public class SetOperation extends HiveCommandOperation {
+
+  protected SetOperation(HiveSession parentSession, String statement,
+      Map<String, String> confOverlay) {
+    super(parentSession, statement, confOverlay);
+    setCommandProcessor(new SetProcessor());
+  }
+
+}

Added: hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSession.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSession.java?rev=1455659&view=auto
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSession.java (added)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSession.java Tue Mar 12 18:22:00 2013
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.cli.session;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hive.service.cli.FetchOrientation;
+import org.apache.hive.service.cli.GetInfoType;
+import org.apache.hive.service.cli.GetInfoValue;
+import org.apache.hive.service.cli.HiveSQLException;
+import org.apache.hive.service.cli.OperationHandle;
+import org.apache.hive.service.cli.RowSet;
+import org.apache.hive.service.cli.SessionHandle;
+import org.apache.hive.service.cli.TableSchema;
+import org.apache.hive.service.cli.operation.OperationManager;
+
+public interface HiveSession {
+  /**
+   * Set the session manager for the session
+   * @param sessionManager
+   */
+  public void setSessionManager(SessionManager sessionManager);
+
+  /**
+   * Set operation manager for the session
+   * @param operationManager
+   */
+  public void setOperationManager(OperationManager operationManager);
+
+  public SessionHandle getSessionHandle();
+
+  public String getUsername();
+
+  public String getPassword();
+
+  public HiveConf getHiveConf();
+
+  public IMetaStoreClient getMetaStoreClient() throws HiveSQLException;
+
+  /**
+   * getInfo operation handler
+   * @param getInfoType
+   * @return
+   * @throws HiveSQLException
+   */
+  public GetInfoValue getInfo(GetInfoType getInfoType) throws HiveSQLException;
+
+  /**
+   * execute operation handler
+   * @param statement
+   * @param confOverlay
+   * @return
+   * @throws HiveSQLException
+   */
+  public OperationHandle executeStatement(String statement,
+      Map<String, String> confOverlay) throws HiveSQLException;
+
+  /**
+   * getTypeInfo operation handler
+   * @return
+   * @throws HiveSQLException
+   */
+  public OperationHandle getTypeInfo() throws HiveSQLException;
+
+  /**
+   * getCatalogs operation handler
+   * @return
+   * @throws HiveSQLException
+   */
+  public OperationHandle getCatalogs() throws HiveSQLException;
+
+  /**
+   * getSchemas operation handler
+   * @param catalogName
+   * @param schemaName
+   * @return
+   * @throws HiveSQLException
+   */
+  public OperationHandle getSchemas(String catalogName, String schemaName)
+      throws HiveSQLException;
+
+  /**
+   * getTables operation handler
+   * @param catalogName
+   * @param schemaName
+   * @param tableName
+   * @param tableTypes
+   * @return
+   * @throws HiveSQLException
+   */
+  public OperationHandle getTables(String catalogName, String schemaName,
+      String tableName, List<String> tableTypes) throws HiveSQLException;
+
+  /**
+   * getTableTypes operation handler
+   * @return
+   * @throws HiveSQLException
+   */
+  public OperationHandle getTableTypes() throws HiveSQLException ;
+
+  /**
+   * getColumns operation handler
+   * @param catalogName
+   * @param schemaName
+   * @param tableName
+   * @param columnName
+   * @return
+   * @throws HiveSQLException
+   */
+  public OperationHandle getColumns(String catalogName, String schemaName,
+      String tableName, String columnName)  throws HiveSQLException;
+
+  /**
+   * getFunctions operation handler
+   * @param catalogName
+   * @param schemaName
+   * @param functionName
+   * @return
+   * @throws HiveSQLException
+   */
+  public OperationHandle getFunctions(String catalogName, String schemaName,
+      String functionName) throws HiveSQLException;
+
+  /**
+   * close the session
+   * @throws HiveSQLException
+   */
+  public void close() throws HiveSQLException;
+
+  public void cancelOperation(OperationHandle opHandle) throws HiveSQLException;
+
+  public void closeOperation(OperationHandle opHandle) throws HiveSQLException;
+
+  public TableSchema getResultSetMetadata(OperationHandle opHandle)
+      throws HiveSQLException;
+
+  public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, long maxRows)
+      throws HiveSQLException;
+
+  public RowSet fetchResults(OperationHandle opHandle) throws HiveSQLException;
+
+  public SessionState getSessionState();
+
+  public String getUserName();
+
+  public void setUserName(String userName);
+}

Added: hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java?rev=1455659&view=auto
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java (added)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java Tue Mar 12 18:22:00 2013
@@ -0,0 +1,344 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.cli.session;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hive.service.cli.FetchOrientation;
+import org.apache.hive.service.cli.GetInfoType;
+import org.apache.hive.service.cli.GetInfoValue;
+import org.apache.hive.service.cli.HiveSQLException;
+import org.apache.hive.service.cli.OperationHandle;
+import org.apache.hive.service.cli.RowSet;
+import org.apache.hive.service.cli.SessionHandle;
+import org.apache.hive.service.cli.TableSchema;
+import org.apache.hive.service.cli.operation.ExecuteStatementOperation;
+import org.apache.hive.service.cli.operation.GetCatalogsOperation;
+import org.apache.hive.service.cli.operation.GetColumnsOperation;
+import org.apache.hive.service.cli.operation.GetFunctionsOperation;
+import org.apache.hive.service.cli.operation.GetSchemasOperation;
+import org.apache.hive.service.cli.operation.GetTableTypesOperation;
+import org.apache.hive.service.cli.operation.GetTypeInfoOperation;
+import org.apache.hive.service.cli.operation.MetadataOperation;
+import org.apache.hive.service.cli.operation.OperationManager;
+
+/**
+ * HiveSession
+ *
+ */
+public class HiveSessionImpl implements HiveSession {
+
+  private final SessionHandle sessionHandle = new SessionHandle();
+  private String username;
+  private final String password;
+  private final Map<String, String> sessionConf = new HashMap<String, String>();
+  private final HiveConf hiveConf = new HiveConf();
+  private final SessionState sessionState;
+
+  private static final String FETCH_WORK_SERDE_CLASS =
+      "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe";
+
+  private SessionManager sessionManager;
+  private OperationManager operationManager;
+  private IMetaStoreClient metastoreClient = null;
+
+  public HiveSessionImpl(String username, String password, Map<String, String> sessionConf) {
+    this.username = username;
+    this.password = password;
+
+    if (sessionConf != null) {
+      for (Map.Entry<String, String> entry : sessionConf.entrySet()) {
+        hiveConf.set(entry.getKey(), entry.getValue());
+      }
+    }
+
+    sessionState = new SessionState(hiveConf);
+  }
+
+  private SessionManager getSessionManager() {
+    return sessionManager;
+  }
+
+  public void setSessionManager(SessionManager sessionManager) {
+    this.sessionManager = sessionManager;
+  }
+
+  private OperationManager getOperationManager() {
+    return operationManager;
+  }
+
+  public void setOperationManager(OperationManager operationManager) {
+    this.operationManager = operationManager;
+  }
+
+  protected synchronized void acquire() throws HiveSQLException {
+    SessionState.start(sessionState);
+  }
+
+  protected synchronized void release() {
+    assert sessionState != null;
+    // no need to release sessionState...
+  }
+
+  public SessionHandle getSessionHandle() {
+    return sessionHandle;
+  }
+
+  public String getUsername() {
+    return username;
+  }
+
+  public String getPassword() {
+    return password;
+  }
+
+  public HiveConf getHiveConf() {
+    hiveConf.setVar(HiveConf.ConfVars.HIVEFETCHOUTPUTSERDE, FETCH_WORK_SERDE_CLASS);
+    return hiveConf;
+  }
+
+  public IMetaStoreClient getMetaStoreClient() throws HiveSQLException {
+    if (metastoreClient == null) {
+      try {
+        metastoreClient = new HiveMetaStoreClient(getHiveConf());
+      } catch (MetaException e) {
+        throw new HiveSQLException(e);
+      }
+    }
+    return metastoreClient;
+  }
+
+  public GetInfoValue getInfo(GetInfoType getInfoType)
+      throws HiveSQLException {
+    acquire();
+    try {
+      switch (getInfoType) {
+      case CLI_SERVER_NAME:
+        return new GetInfoValue("Hive");
+      case CLI_DBMS_NAME:
+        return new GetInfoValue("Apache Hive");
+      case CLI_DBMS_VER:
+        return new GetInfoValue("0.10.0");
+      case CLI_MAX_COLUMN_NAME_LEN:
+        return new GetInfoValue(128);
+      case CLI_MAX_SCHEMA_NAME_LEN:
+        return new GetInfoValue(128);
+      case CLI_MAX_TABLE_NAME_LEN:
+        return new GetInfoValue(128);
+      case CLI_TXN_CAPABLE:
+      default:
+        throw new HiveSQLException("Unrecognized GetInfoType value: " + getInfoType.toString());
+      }
+    } finally {
+      release();
+    }
+  }
+
+  public OperationHandle executeStatement(String statement, Map<String, String> confOverlay)
+      throws HiveSQLException {
+    acquire();
+    try {
+      ExecuteStatementOperation operation = getOperationManager()
+          .newExecuteStatementOperation(getSession(), statement, confOverlay);
+      operation.run();
+      return operation.getHandle();
+    } finally {
+      release();
+    }
+  }
+
+  public OperationHandle getTypeInfo()
+      throws HiveSQLException {
+    acquire();
+    try {
+      GetTypeInfoOperation operation = getOperationManager().newGetTypeInfoOperation(getSession());
+      operation.run();
+      return operation.getHandle();
+    } finally {
+      release();
+    }
+  }
+
+  public OperationHandle getCatalogs()
+      throws HiveSQLException {
+    acquire();
+    try {
+      GetCatalogsOperation operation = getOperationManager().newGetCatalogsOperation(getSession());
+      operation.run();
+      return operation.getHandle();
+    } finally {
+      release();
+    }
+  }
+
+  public OperationHandle getSchemas(String catalogName, String schemaName)
+      throws HiveSQLException {
+      acquire();
+    try {
+      GetSchemasOperation operation =
+          getOperationManager().newGetSchemasOperation(getSession(), catalogName, schemaName);
+      operation.run();
+      return operation.getHandle();
+    } finally {
+      release();
+    }
+  }
+
+  public OperationHandle getTables(String catalogName, String schemaName, String tableName,
+      List<String> tableTypes)
+      throws HiveSQLException {
+      acquire();
+    try {
+      MetadataOperation operation =
+          getOperationManager().newGetTablesOperation(getSession(), catalogName, schemaName, tableName, tableTypes);
+      operation.run();
+      return operation.getHandle();
+    } finally {
+      release();
+    }
+  }
+
+  public OperationHandle getTableTypes()
+      throws HiveSQLException {
+      acquire();
+    try {
+      GetTableTypesOperation operation = getOperationManager().newGetTableTypesOperation(getSession());
+      operation.run();
+      return operation.getHandle();
+    } finally {
+      release();
+    }
+  }
+
+  public OperationHandle getColumns(String catalogName, String schemaName,
+      String tableName, String columnName)  throws HiveSQLException {
+    acquire();
+    try {
+    GetColumnsOperation operation = getOperationManager().newGetColumnsOperation(getSession(),
+        catalogName, schemaName, tableName, columnName);
+    operation.run();
+    return operation.getHandle();
+    } finally {
+      release();
+    }
+  }
+
+  public OperationHandle getFunctions(String catalogName, String schemaName, String functionName)
+      throws HiveSQLException {
+    acquire();
+    try {
+      GetFunctionsOperation operation = getOperationManager()
+          .newGetFunctionsOperation(getSession(), catalogName, schemaName, functionName);
+      operation.run();
+      return operation.getHandle();
+    } finally {
+      release();
+    }
+  }
+
+  public void close() throws HiveSQLException {
+    try {
+      acquire();
+      /**
+       *  For metadata operations like getTables(), getColumns() etc,
+       * the session allocates a private metastore handler which should be
+       * closed at the end of the session
+       */
+      if (metastoreClient != null) {
+        metastoreClient.close();
+      }
+    } finally {
+      release();
+    }
+  }
+
+  public SessionState getSessionState() {
+    return sessionState;
+  }
+
+  public String getUserName() {
+    return username;
+  }
+  public void setUserName(String userName) {
+    this.username = userName;
+  }
+
+  @Override
+  public void cancelOperation(OperationHandle opHandle) throws HiveSQLException {
+    acquire();
+    try {
+      sessionManager.getOperationManager().cancelOperation(opHandle);
+    } finally {
+      release();
+    }
+  }
+
+  @Override
+  public void closeOperation(OperationHandle opHandle) throws HiveSQLException {
+    acquire();
+    try {
+      sessionManager.getOperationManager().closeOperation(opHandle);
+    } finally {
+      release();
+    }
+  }
+
+  @Override
+  public TableSchema getResultSetMetadata(OperationHandle opHandle) throws HiveSQLException {
+    acquire();
+    try {
+      return sessionManager.getOperationManager().getOperationResultSetSchema(opHandle);
+    } finally {
+      release();
+    }
+  }
+
+  @Override
+  public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, long maxRows)
+      throws HiveSQLException {
+    acquire();
+    try {
+      return sessionManager.getOperationManager()
+          .getOperationNextRowSet(opHandle, orientation, maxRows);
+    } finally {
+      release();
+    }
+  }
+
+  @Override
+  public RowSet fetchResults(OperationHandle opHandle) throws HiveSQLException {
+    acquire();
+    try {
+      return sessionManager.getOperationManager().getOperationNextRowSet(opHandle);
+    } finally {
+      release();
+    }
+  }
+
+  protected HiveSession getSession() {
+    return this;
+  }
+}

Added: hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java?rev=1455659&view=auto
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java (added)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java Tue Mar 12 18:22:00 2013
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.cli.session;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hive.service.cli.HiveSQLException;
+
+/**
+ *
+ * HiveSessionImplwithUGI.
+ * HiveSession with connecting user's UGI and delegation token if required
+ */
+public class HiveSessionImplwithUGI extends HiveSessionImpl {
+  public static final String HS2TOKEN = "HiveServer2ImpersonationToken";
+
+  private UserGroupInformation sessionUgi = null;
+  private String delegationTokenStr = null;
+  private Hive sessionHive = null;
+  private HiveSession proxySession = null;
+
+  public HiveSessionImplwithUGI(String username, String password, Map<String, String> sessionConf,
+      String delegationToken) throws HiveSQLException {
+    super(username, password, sessionConf);
+    setSessionUGI(username);
+    setUserPath(username);
+    setDelegationToken(delegationToken);
+  }
+
+  // setup appropriate UGI for the session
+  public void setSessionUGI(String owner) throws HiveSQLException {
+    if (owner == null) {
+      throw new HiveSQLException("No username provided for impersonation");
+    }
+    if (ShimLoader.getHadoopShims().isSecurityEnabled()) {
+      try {
+        sessionUgi = ShimLoader.getHadoopShims().createProxyUser(owner);
+      } catch (IOException e) {
+        throw new HiveSQLException("Couldn't setup proxy user", e);
+      }
+    } else {
+      sessionUgi = ShimLoader.getHadoopShims().createRemoteUser(owner, null);
+    }
+  }
+
+  public UserGroupInformation getSessionUgi() {
+    return this.sessionUgi;
+  }
+
+  public String getDelegationToken () {
+    return this.delegationTokenStr;
+  }
+
+  @Override
+  protected synchronized void acquire() throws HiveSQLException {
+    super.acquire();
+    // if we have a metastore connection with impersonation, then set it first
+    if (sessionHive != null) {
+      Hive.set(sessionHive);
+    }
+  }
+
+  /**
+   * close the file systems for the session
+   * cancel the session's delegation token and close the metastore connection
+   */
+  @Override
+  public void close() throws HiveSQLException {
+    try {
+    acquire();
+    ShimLoader.getHadoopShims().closeAllForUGI(sessionUgi);
+    cancelDelegationToken();
+    } finally {
+      release();
+      super.close();
+    }
+  }
+
+  /**
+   * Enable delegation token for the session
+   * save the token string and set the token.signature in hive conf. The metastore client uses
+   * this token.signature to determine where to use kerberos or delegation token
+   * @throws HiveException
+   * @throws IOException
+   */
+  private void setDelegationToken(String delegationTokenStr) throws HiveSQLException {
+    this.delegationTokenStr = delegationTokenStr;
+    if (delegationTokenStr != null) {
+      getHiveConf().set("hive.metastore.token.signature", HS2TOKEN);
+      try {
+        ShimLoader.getHadoopShims().setTokenStr(sessionUgi, delegationTokenStr, HS2TOKEN);
+      } catch (IOException e) {
+        throw new HiveSQLException("Couldn't setup delegation token in the ugi", e);
+      }
+      // create a new metastore connection using the delegation token
+      Hive.set(null);
+      try {
+        sessionHive = Hive.get(getHiveConf());
+      } catch (HiveException e) {
+        throw new HiveSQLException("Failed to setup metastore connection", e);
+      }
+    }
+  }
+
+  // If the session has a delegation token obtained from the metastore, then cancel it
+  private void cancelDelegationToken() throws HiveSQLException {
+    if (delegationTokenStr != null) {
+      try {
+        Hive.get(getHiveConf()).cancelDelegationToken(delegationTokenStr);
+      } catch (HiveException e) {
+        throw new HiveSQLException("Couldn't cancel delegation token", e);
+      }
+      // close the metastore connection created with this delegation token
+      Hive.closeCurrent();
+    }
+  }
+
+  // Append the user name to temp/scratch directory path for each impersonated user
+  private void setUserPath(String userName) {
+    for (HiveConf.ConfVars var: HiveConf.userVars) {
+      String userVar = getHiveConf().getVar(var);
+      if (userVar != null) {
+        // If there's a path separator at end then remove it
+        if (userVar.endsWith(File.separator)) {
+          userVar = userVar.substring(0, userVar.length()-2);
+        }
+        getHiveConf().setVar(var, userVar + "-" + userName);
+      }
+    }
+  }
+
+  @Override
+  protected HiveSession getSession() {
+    assert proxySession != null;
+
+    return proxySession;
+  }
+
+  public void setProxySession(HiveSession proxySession) {
+    this.proxySession = proxySession;
+  }
+
+
+}

Added: hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionProxy.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionProxy.java?rev=1455659&view=auto
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionProxy.java (added)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionProxy.java Tue Mar 12 18:22:00 2013
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.cli.session;
+
+/**
+ * Proxy wrapper on HiveSession to execute operations
+ * by impersonating given user
+ */
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
+
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hive.service.cli.HiveSQLException;
+
+public class HiveSessionProxy implements InvocationHandler {
+  private final HiveSession base;
+  private final UserGroupInformation ugi;
+
+  public HiveSessionProxy(HiveSession hiveSession, UserGroupInformation ugi) {
+    this.base = hiveSession;
+    this.ugi = ugi;
+  }
+
+  public static HiveSession getProxy(HiveSession hiveSession, UserGroupInformation ugi)
+      throws IllegalArgumentException, HiveSQLException {
+    return (HiveSession)Proxy.newProxyInstance(HiveSession.class.getClassLoader(),
+        new Class<?>[] {HiveSession.class},
+        new HiveSessionProxy(hiveSession, ugi));
+  }
+
+  @Override
+  public Object invoke(Object arg0, final Method method, final Object[] args)
+      throws Throwable {
+    try {
+      return ShimLoader.getHadoopShims().doAs(ugi,
+        new PrivilegedExceptionAction<Object> () {
+          @Override
+          public Object run() throws HiveSQLException {
+            try {
+              return method.invoke(base, args);
+            } catch (InvocationTargetException e) {
+              if (e.getCause() instanceof HiveSQLException) {
+                throw (HiveSQLException)e.getCause();
+              } else {
+                throw new RuntimeException(e.getCause());
+              }
+            } catch (IllegalArgumentException e) {
+              throw new RuntimeException(e);
+            } catch (IllegalAccessException e) {
+              throw new RuntimeException(e);
+            }
+          }
+        });
+    } catch (UndeclaredThrowableException e) {
+      Throwable innerException = e.getCause();
+      if (innerException instanceof PrivilegedActionException) {
+        throw innerException.getCause();
+      } else {
+        throw e.getCause();
+      }
+    }
+  }
+
+}
+

Added: hive/trunk/service/src/java/org/apache/hive/service/cli/session/SessionManager.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/session/SessionManager.java?rev=1455659&view=auto
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/session/SessionManager.java (added)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/session/SessionManager.java Tue Mar 12 18:22:00 2013
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.cli.session;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hive.service.CompositeService;
+import org.apache.hive.service.cli.HiveSQLException;
+import org.apache.hive.service.cli.SessionHandle;
+import org.apache.hive.service.cli.operation.OperationManager;
+
+/**
+ * SessionManager.
+ *
+ */
+public class SessionManager extends CompositeService {
+
+  private HiveConf hiveConf;
+  private final Map<SessionHandle, HiveSession> handleToSession = new HashMap<SessionHandle, HiveSession>();
+  private OperationManager operationManager = new OperationManager();
+  private static final Object sessionMapLock = new Object();
+
+  public SessionManager() {
+    super("SessionManager");
+  }
+
+  @Override
+  public synchronized void init(HiveConf hiveConf) {
+    this.hiveConf = hiveConf;
+
+    operationManager = new OperationManager();
+    addService(operationManager);
+
+    super.init(hiveConf);
+  }
+
+  @Override
+  public synchronized void start() {
+    super.start();
+    // TODO
+  }
+
+  @Override
+  public synchronized void stop() {
+    // TODO
+    super.stop();
+  }
+
+
+  public SessionHandle openSession(String username, String password, Map<String, String> sessionConf)
+          throws HiveSQLException {
+     return openSession(username, password, sessionConf, false, null);
+  }
+
+  public SessionHandle openSession(String username, String password, Map<String, String> sessionConf,
+          boolean withImpersonation, String delegationToken) throws HiveSQLException {
+    HiveSession session;
+    if (username == null) {
+      username = threadLocalUserName.get();
+    }
+
+    if (withImpersonation) {
+          HiveSessionImplwithUGI hiveSessionUgi = new HiveSessionImplwithUGI(username, password, sessionConf,
+              delegationToken);
+          session = (HiveSession)HiveSessionProxy.getProxy(hiveSessionUgi, hiveSessionUgi.getSessionUgi());
+          hiveSessionUgi.setProxySession(session);
+    } else {
+      session = new HiveSessionImpl(username, password, sessionConf);
+    }
+    session.setSessionManager(this);
+    session.setOperationManager(operationManager);
+    synchronized(sessionMapLock) {
+      handleToSession.put(session.getSessionHandle(), session);
+    }
+    return session.getSessionHandle();
+  }
+
+  public void closeSession(SessionHandle sessionHandle) throws HiveSQLException {
+    HiveSession session;
+    synchronized(sessionMapLock) {
+      session = handleToSession.remove(sessionHandle);
+    }
+    if (session == null) {
+      throw new HiveSQLException("Session does not exist!");
+    }
+    session.close();
+  }
+
+
+  public HiveSession getSession(SessionHandle sessionHandle) throws HiveSQLException {
+    HiveSession session;
+    synchronized(sessionMapLock) {
+      session = handleToSession.get(sessionHandle);
+    }
+    if (session == null) {
+      throw new HiveSQLException("Invalid SessionHandle: " + sessionHandle);
+    }
+    return session;
+  }
+
+  public OperationManager getOperationManager() {
+    return operationManager;
+  }
+
+  private static ThreadLocal<String> threadLocalIpAddress = new ThreadLocal<String>() {
+    @Override
+    protected synchronized String initialValue() {
+      return null;
+    }
+  };
+
+  public static void setIpAddress(String ipAddress) {
+    threadLocalIpAddress.set(ipAddress);
+  }
+
+  private void clearIpAddress() {
+    threadLocalIpAddress.remove();
+  }
+
+  private static ThreadLocal<String> threadLocalUserName = new ThreadLocal<String>(){
+    @Override
+    protected synchronized String initialValue() {
+      return null;
+    }
+  };
+
+  public static void setUserName(String userName) {
+    threadLocalUserName.set(userName);
+  }
+
+  private void clearUserName() {
+    threadLocalUserName.remove();
+  }
+
+}

Added: hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/EmbeddedThriftCLIService.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/EmbeddedThriftCLIService.java?rev=1455659&view=auto
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/EmbeddedThriftCLIService.java (added)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/EmbeddedThriftCLIService.java Tue Mar 12 18:22:00 2013
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.cli.thrift;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hive.service.cli.CLIService;
+
+
+/**
+ * EmbeddedThriftCLIService.
+ *
+ */
+public class EmbeddedThriftCLIService extends ThriftCLIService {
+
+  public EmbeddedThriftCLIService() {
+    super(new CLIService());
+    isEmbedded = true;
+    cliService.init(new HiveConf());
+    cliService.start();
+  }
+}

Added: hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java?rev=1455659&view=auto
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java (added)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java Tue Mar 12 18:22:00 2013
@@ -0,0 +1,423 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.cli.thrift;
+
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hive.service.AbstractService;
+import org.apache.hive.service.auth.HiveAuthFactory;
+import org.apache.hive.service.cli.FetchOrientation;
+import org.apache.hive.service.cli.GetInfoType;
+import org.apache.hive.service.cli.GetInfoValue;
+import org.apache.hive.service.cli.HiveSQLException;
+import org.apache.hive.service.cli.OperationHandle;
+import org.apache.hive.service.cli.OperationState;
+import org.apache.hive.service.cli.RowSet;
+import org.apache.hive.service.cli.CLIService;
+import org.apache.hive.service.cli.SessionHandle;
+import org.apache.hive.service.cli.TableSchema;
+import org.apache.thrift.TException;
+import org.apache.thrift.TProcessorFactory;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TTransportFactory;
+
+
+/**
+ * CLIService.
+ *
+ */
+public class ThriftCLIService extends AbstractService implements TCLIService.Iface, Runnable {
+
+  public static final Log LOG = LogFactory.getLog(ThriftCLIService.class.getName());
+
+
+  protected CLIService cliService;
+  private static final TStatus OK_STATUS = new TStatus(TStatusCode.SUCCESS_STATUS);
+  private static final TStatus ERROR_STATUS = new TStatus(TStatusCode.ERROR_STATUS);
+
+  private static HiveAuthFactory hiveAuthFactory;
+
+  private int portNum;
+  private InetSocketAddress serverAddress;
+  private TServer server;
+
+  private boolean isStarted = false;
+  protected boolean isEmbedded = false;
+
+  private HiveConf hiveConf;
+
+  private int minWorkerThreads;
+  private int maxWorkerThreads;
+
+
+
+  public ThriftCLIService(CLIService cliService) {
+    super("ThriftCLIService");
+    this.cliService = cliService;
+  }
+
+  @Override
+  public synchronized void init(HiveConf hiveConf) {
+    this.hiveConf = hiveConf;
+    super.init(hiveConf);
+  }
+
+  @Override
+  public synchronized void start() {
+    super.start();
+    if (!isStarted && !isEmbedded) {
+      new Thread(this).start();
+      isStarted = true;
+    }
+  }
+
+  @Override
+  public synchronized void stop() {
+    if (isStarted && !isEmbedded) {
+      server.stop();
+      isStarted = false;
+    }
+    super.stop();
+  }
+
+
+  @Override
+  public TOpenSessionResp OpenSession(TOpenSessionReq req) throws TException {
+    TOpenSessionResp resp = new TOpenSessionResp();
+    try {
+      String userName;
+      if (hiveAuthFactory != null
+          && hiveAuthFactory.getRemoteUser() != null) {
+        userName = hiveAuthFactory.getRemoteUser();
+      } else {
+        userName = req.getUsername();
+      }
+      SessionHandle sessionHandle = null;
+      if (cliService.getHiveConf().
+          getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_IMPERSONATION)) {
+        String delegationTokenStr = null;
+        try {
+          delegationTokenStr = cliService.getDelegationTokenFromMetaStore(userName);
+        } catch (UnsupportedOperationException e) {
+          // The delegation token is not applicable in the given deployment mode
+        }
+        sessionHandle = cliService.openSessionWithImpersonation(userName, req.getPassword(),
+              req.getConfiguration(), delegationTokenStr);
+      } else {
+        sessionHandle = cliService.openSession(userName, req.getPassword(),
+              req.getConfiguration());
+      }
+      resp.setSessionHandle(sessionHandle.toTSessionHandle());
+      // TODO: set real configuration map
+      resp.setConfiguration(new HashMap<String, String>());
+      resp.setStatus(OK_STATUS);
+    } catch (Exception e) {
+      e.printStackTrace();
+      resp.setStatus(HiveSQLException.toTStatus(e));
+    }
+    return resp;
+  }
+
+  @Override
+  public TCloseSessionResp CloseSession(TCloseSessionReq req) throws TException {
+    TCloseSessionResp resp = new TCloseSessionResp();
+    try {
+      SessionHandle sessionHandle = new SessionHandle(req.getSessionHandle());
+      cliService.closeSession(sessionHandle);
+      resp.setStatus(OK_STATUS);
+    } catch (Exception e) {
+      e.printStackTrace();
+      resp.setStatus(HiveSQLException.toTStatus(e));
+    }
+    return resp;
+  }
+
+  @Override
+  public TGetInfoResp GetInfo(TGetInfoReq req) throws TException {
+    TGetInfoResp resp = new TGetInfoResp();
+    try {
+      GetInfoValue getInfoValue =
+          cliService.getInfo(new SessionHandle(req.getSessionHandle()),
+              GetInfoType.getGetInfoType(req.getInfoType()));
+      resp.setInfoValue(getInfoValue.toTGetInfoValue());
+      resp.setStatus(OK_STATUS);
+    } catch (Exception e) {
+      e.printStackTrace();
+      resp.setStatus(HiveSQLException.toTStatus(e));
+    }
+    return resp;
+  }
+
+  @Override
+  public TExecuteStatementResp ExecuteStatement(TExecuteStatementReq req) throws TException {
+    TExecuteStatementResp resp = new TExecuteStatementResp();
+    try {
+      SessionHandle sessionHandle = new SessionHandle(req.getSessionHandle());
+      String statement = req.getStatement();
+      Map<String, String> confOverlay = req.getConfOverlay();
+      OperationHandle operationHandle =
+          cliService.executeStatement(sessionHandle, statement, confOverlay);
+      resp.setOperationHandle(operationHandle.toTOperationHandle());
+      resp.setStatus(OK_STATUS);
+    } catch (Exception e) {
+      e.printStackTrace();
+      resp.setStatus(HiveSQLException.toTStatus(e));
+    }
+    return resp;
+  }
+
+  @Override
+  public TGetTypeInfoResp GetTypeInfo(TGetTypeInfoReq req) throws TException {
+    TGetTypeInfoResp resp = new TGetTypeInfoResp();
+    try {
+      OperationHandle operationHandle = cliService.getTypeInfo(new SessionHandle(req.getSessionHandle()));
+      resp.setOperationHandle(operationHandle.toTOperationHandle());
+      resp.setStatus(OK_STATUS);
+    } catch (Exception e) {
+      e.printStackTrace();
+      resp.setStatus(HiveSQLException.toTStatus(e));
+    }
+    return resp;
+  }
+
+  @Override
+  public TGetCatalogsResp GetCatalogs(TGetCatalogsReq req) throws TException {
+    TGetCatalogsResp resp = new TGetCatalogsResp();
+    try {
+      OperationHandle opHandle = cliService.getCatalogs(new SessionHandle(req.getSessionHandle()));
+      resp.setOperationHandle(opHandle.toTOperationHandle());
+      resp.setStatus(OK_STATUS);
+    } catch (Exception e) {
+      e.printStackTrace();
+      resp.setStatus(HiveSQLException.toTStatus(e));
+    }
+    return resp;
+  }
+
+  @Override
+  public TGetSchemasResp GetSchemas(TGetSchemasReq req) throws TException {
+    TGetSchemasResp resp = new TGetSchemasResp();
+    try {
+      OperationHandle opHandle = cliService.getSchemas(
+          new SessionHandle(req.getSessionHandle()), req.getCatalogName(), req.getSchemaName());
+      resp.setOperationHandle(opHandle.toTOperationHandle());
+      resp.setStatus(OK_STATUS);
+    } catch (Exception e) {
+      e.printStackTrace();
+      resp.setStatus(HiveSQLException.toTStatus(e));
+    }
+    return resp;
+  }
+
+  @Override
+  public TGetTablesResp GetTables(TGetTablesReq req) throws TException {
+    TGetTablesResp resp = new TGetTablesResp();
+    try {
+      OperationHandle opHandle = cliService
+          .getTables(new SessionHandle(req.getSessionHandle()), req.getCatalogName(),
+              req.getSchemaName(), req.getTableName(), req.getTableTypes());
+      resp.setOperationHandle(opHandle.toTOperationHandle());
+      resp.setStatus(OK_STATUS);
+    } catch (Exception e) {
+      e.printStackTrace();
+      resp.setStatus(HiveSQLException.toTStatus(e));
+    }
+    return resp;
+  }
+
+  @Override
+  public TGetTableTypesResp GetTableTypes(TGetTableTypesReq req) throws TException {
+    TGetTableTypesResp resp = new TGetTableTypesResp();
+    try {
+      OperationHandle opHandle = cliService.getTableTypes(new SessionHandle(req.getSessionHandle()));
+      resp.setOperationHandle(opHandle.toTOperationHandle());
+      resp.setStatus(OK_STATUS);
+    } catch (Exception e) {
+      e.printStackTrace();
+      resp.setStatus(HiveSQLException.toTStatus(e));
+    }
+    return resp;
+  }
+
+  @Override
+  public TGetColumnsResp GetColumns(TGetColumnsReq req) throws TException {
+    TGetColumnsResp resp = new TGetColumnsResp();
+    try {
+      OperationHandle opHandle = cliService.getColumns(
+          new SessionHandle(req.getSessionHandle()),
+          req.getCatalogName(),
+          req.getSchemaName(),
+          req.getTableName(),
+          req.getColumnName());
+      resp.setOperationHandle(opHandle.toTOperationHandle());
+      resp.setStatus(OK_STATUS);
+    } catch (Exception e) {
+      e.printStackTrace();
+      resp.setStatus(HiveSQLException.toTStatus(e));
+    }
+    return resp;
+  }
+
+  @Override
+  public TGetFunctionsResp GetFunctions(TGetFunctionsReq req) throws TException {
+    TGetFunctionsResp resp = new TGetFunctionsResp();
+    try {
+      OperationHandle opHandle = cliService.getFunctions(
+          new SessionHandle(req.getSessionHandle()), req.getCatalogName(),
+          req.getSchemaName(), req.getFunctionName());
+      resp.setOperationHandle(opHandle.toTOperationHandle());
+      resp.setStatus(OK_STATUS);
+    } catch (Exception e) {
+      e.printStackTrace();
+      resp.setStatus(HiveSQLException.toTStatus(e));
+    }
+    return resp;
+  }
+
+  @Override
+  public TGetOperationStatusResp GetOperationStatus(TGetOperationStatusReq req) throws TException {
+    TGetOperationStatusResp resp = new TGetOperationStatusResp();
+    try {
+      OperationState operationState = cliService.getOperationStatus(new OperationHandle(req.getOperationHandle()));
+      resp.setOperationState(operationState.toTOperationState());
+      resp.setStatus(OK_STATUS);
+    } catch (Exception e) {
+      e.printStackTrace();
+      resp.setStatus(HiveSQLException.toTStatus(e));
+    }
+    return resp;
+  }
+
+  @Override
+  public TCancelOperationResp CancelOperation(TCancelOperationReq req) throws TException {
+    TCancelOperationResp resp = new TCancelOperationResp();
+    try {
+      cliService.cancelOperation(new OperationHandle(req.getOperationHandle()));
+      resp.setStatus(OK_STATUS);
+    } catch (Exception e) {
+      e.printStackTrace();
+      resp.setStatus(HiveSQLException.toTStatus(e));
+    }
+    return resp;
+  }
+
+  @Override
+  public TCloseOperationResp CloseOperation(TCloseOperationReq req) throws TException {
+    TCloseOperationResp resp = new TCloseOperationResp();
+    try {
+      cliService.closeOperation(new OperationHandle(req.getOperationHandle()));
+      resp.setStatus(OK_STATUS);
+    } catch (Exception e) {
+      e.printStackTrace();
+      resp.setStatus(HiveSQLException.toTStatus(e));
+    }
+    return resp;
+  }
+
+  @Override
+  public TGetResultSetMetadataResp GetResultSetMetadata(TGetResultSetMetadataReq req)
+      throws TException {
+    TGetResultSetMetadataResp resp = new TGetResultSetMetadataResp();
+    try {
+      TableSchema schema = cliService.getResultSetMetadata(new OperationHandle(req.getOperationHandle()));
+      resp.setSchema(schema.toTTableSchema());
+      resp.setStatus(OK_STATUS);
+    } catch (Exception e) {
+      e.printStackTrace();
+      resp.setStatus(HiveSQLException.toTStatus(e));
+    }
+    return resp;
+  }
+
+  @Override
+  public TFetchResultsResp FetchResults(TFetchResultsReq req) throws TException {
+    TFetchResultsResp resp = new TFetchResultsResp();
+    try {
+      RowSet rowSet = cliService.fetchResults(
+          new OperationHandle(req.getOperationHandle()),
+          FetchOrientation.getFetchOrientation(req.getOrientation()),
+          req.getMaxRows());
+      resp.setResults(rowSet.toTRowSet());
+      resp.setHasMoreRows(false);
+      resp.setStatus(OK_STATUS);
+    } catch (Exception e) {
+      e.printStackTrace();
+      resp.setStatus(HiveSQLException.toTStatus(e));
+    }
+    return resp;
+  }
+
+
+  @Override
+  public void run() {
+    try {
+      hiveAuthFactory = new HiveAuthFactory();
+      TTransportFactory  transportFactory = hiveAuthFactory.getAuthTransFactory();
+      TProcessorFactory processorFactory = hiveAuthFactory.getAuthProcFactory(this);
+
+      String portString = System.getenv("HIVE_SERVER2_THRIFT_PORT");
+      if (portString != null) {
+        portNum = Integer.valueOf(portString);
+      } else {
+        portNum = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_PORT);
+      }
+
+      String hiveHost = System.getenv("HIVE_SERVER2_THRIFT_BIND_HOST");
+      if (hiveHost == null) {
+        hiveHost = hiveConf.getVar(ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST);
+      }
+
+      if (hiveHost != null && !hiveHost.isEmpty()) {
+        serverAddress = new InetSocketAddress(hiveHost, portNum);
+      } else {
+        serverAddress = new  InetSocketAddress(portNum);
+      }
+
+
+      minWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MIN_WORKER_THREADS);
+      maxWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MAX_WORKER_THREADS);
+
+
+      TThreadPoolServer.Args sargs = new TThreadPoolServer.Args(new TServerSocket(serverAddress))
+      .processorFactory(processorFactory)
+      .transportFactory(transportFactory)
+      .protocolFactory(new TBinaryProtocol.Factory())
+      .minWorkerThreads(minWorkerThreads)
+      .maxWorkerThreads(maxWorkerThreads);
+
+      server = new TThreadPoolServer(sargs);
+
+      LOG.info("ThriftCLIService listening on " + serverAddress);
+
+      server.serve();
+    } catch (Throwable t) {
+      t.printStackTrace();
+    }
+  }
+
+}