You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/08/27 22:44:07 UTC
svn commit: r1620976 [2/2] - in /hive/trunk:
common/src/java/org/apache/hadoop/hive/conf/ service/if/
service/src/gen/thrift/gen-cpp/
service/src/gen/thrift/gen-javabean/org/apache/hive/service/cli/thrift/
service/src/gen/thrift/gen-py/TCLIService/ ser...
Added: hive/trunk/service/src/test/org/apache/hive/service/cli/operation/TestOperationLoggingAPI.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/test/org/apache/hive/service/cli/operation/TestOperationLoggingAPI.java?rev=1620976&view=auto
==============================================================================
--- hive/trunk/service/src/test/org/apache/hive/service/cli/operation/TestOperationLoggingAPI.java (added)
+++ hive/trunk/service/src/test/org/apache/hive/service/cli/operation/TestOperationLoggingAPI.java Wed Aug 27 20:44:06 2014
@@ -0,0 +1,253 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.service.cli.operation;
+
+import org.junit.Assert;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hive.service.cli.*;
+import org.apache.hive.service.cli.thrift.EmbeddedThriftBinaryCLIService;
+import org.apache.hive.service.cli.thrift.ThriftCLIServiceClient;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+
+/**
+ * TestOperationLoggingAPI
+ * Test the FetchResults of TFetchType.LOG in thrift level.
+ */
+public class TestOperationLoggingAPI {
+ private HiveConf hiveConf = new HiveConf();
+ private String tableName = "testOperationLoggingAPI_table";
+ private File dataFile;
+ private ThriftCLIServiceClient client;
+ private SessionHandle sessionHandle;
+ private String sql = "select * from " + tableName;
+ private String[] expectedLogs = {
+ "Parsing command",
+ "Parse Completed",
+ "Starting Semantic Analysis",
+ "Semantic Analysis Completed",
+ "Starting command"
+ };
+
+ /**
+ * Start embedded mode, open a session, and create a table for cases usage
+ * @throws Exception
+ */
+ @Before
+ public void setUp() throws Exception {
+ dataFile = new File(hiveConf.get("test.data.files"), "kv1.txt");
+ EmbeddedThriftBinaryCLIService service = new EmbeddedThriftBinaryCLIService();
+ service.init(hiveConf);
+ client = new ThriftCLIServiceClient(service);
+ sessionHandle = setupSession();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ // Cleanup
+ String queryString = "DROP TABLE " + tableName;
+ client.executeStatement(sessionHandle, queryString, null);
+
+ client.closeSession(sessionHandle);
+ }
+
+ @Test
+ public void testFetchResultsOfLog() throws Exception {
+ // verify whether the sql operation log is generated and fetch correctly.
+ OperationHandle operationHandle = client.executeStatement(sessionHandle, sql, null);
+ RowSet rowSetLog = client.fetchResults(operationHandle, FetchOrientation.FETCH_FIRST, 1000,
+ FetchType.LOG);
+ verifyFetchedLog(rowSetLog);
+ }
+
+ @Test
+ public void testFetchResultsOfLogAsync() throws Exception {
+ // verify whether the sql operation log is generated and fetch correctly in async mode.
+ OperationHandle operationHandle = client.executeStatementAsync(sessionHandle, sql, null);
+
+ // Poll on the operation status till the query is completed
+ boolean isQueryRunning = true;
+ long pollTimeout = System.currentTimeMillis() + 100000;
+ OperationStatus opStatus;
+ OperationState state = null;
+ RowSet rowSetAccumulated = null;
+ StringBuilder logs = new StringBuilder();
+
+ while (isQueryRunning) {
+ // Break if polling times out
+ if (System.currentTimeMillis() > pollTimeout) {
+ break;
+ }
+ opStatus = client.getOperationStatus(operationHandle);
+ Assert.assertNotNull(opStatus);
+ state = opStatus.getState();
+
+ rowSetAccumulated = client.fetchResults(operationHandle, FetchOrientation.FETCH_NEXT, 1000,
+ FetchType.LOG);
+ for (Object[] row : rowSetAccumulated) {
+ logs.append(row[0]);
+ }
+
+ if (state == OperationState.CANCELED ||
+ state == OperationState.CLOSED ||
+ state == OperationState.FINISHED ||
+ state == OperationState.ERROR) {
+ isQueryRunning = false;
+ }
+ Thread.sleep(10);
+ }
+ // The sql should be completed now.
+ Assert.assertEquals("Query should be finished", OperationState.FINISHED, state);
+
+ // Verify the accumulated logs
+ verifyFetchedLog(logs.toString());
+
+ // Verify the fetched logs from the beginning of the log file
+ RowSet rowSet = client.fetchResults(operationHandle, FetchOrientation.FETCH_FIRST, 1000,
+ FetchType.LOG);
+ verifyFetchedLog(rowSet);
+ }
+
+ @Test
+ public void testFetchResultsOfLogWithOrientation() throws Exception {
+ // (FETCH_FIRST) execute a sql, and fetch its sql operation log as expected value
+ OperationHandle operationHandle = client.executeStatement(sessionHandle, sql, null);
+ RowSet rowSetLog = client.fetchResults(operationHandle, FetchOrientation.FETCH_FIRST, 1000,
+ FetchType.LOG);
+ int expectedLogLength = rowSetLog.numRows();
+
+ // (FETCH_NEXT) execute the same sql again,
+ // and fetch the sql operation log with FETCH_NEXT orientation
+ OperationHandle operationHandleWithOrientation = client.executeStatement(sessionHandle, sql,
+ null);
+ RowSet rowSetLogWithOrientation;
+ int logLength = 0;
+ int maxRows = calculateProperMaxRows(expectedLogLength);
+ do {
+ rowSetLogWithOrientation = client.fetchResults(operationHandleWithOrientation,
+ FetchOrientation.FETCH_NEXT, maxRows, FetchType.LOG);
+ logLength += rowSetLogWithOrientation.numRows();
+ } while (rowSetLogWithOrientation.numRows() == maxRows);
+ Assert.assertEquals(expectedLogLength, logLength);
+
+ // (FETCH_FIRST) fetch again from the same operation handle with FETCH_FIRST orientation
+ rowSetLogWithOrientation = client.fetchResults(operationHandleWithOrientation,
+ FetchOrientation.FETCH_FIRST, 1000, FetchType.LOG);
+ verifyFetchedLog(rowSetLogWithOrientation);
+ }
+
+ @Test
+ public void testFetchResultsOfLogCleanup() throws Exception {
+ // Verify cleanup functionality.
+ // Open a new session, since this case needs to close the session in the end.
+ SessionHandle sessionHandleCleanup = setupSession();
+
+ // prepare
+ OperationHandle operationHandle = client.executeStatement(sessionHandleCleanup, sql, null);
+ RowSet rowSetLog = client.fetchResults(operationHandle, FetchOrientation.FETCH_FIRST, 1000,
+ FetchType.LOG);
+ verifyFetchedLog(rowSetLog);
+
+ File sessionLogDir = new File(
+ hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_LOG_LOCATION) +
+ File.separator + sessionHandleCleanup.getHandleIdentifier());
+ File operationLogFile = new File(sessionLogDir, operationHandle.getHandleIdentifier().toString());
+
+ // check whether exception is thrown when fetching log from a closed operation.
+ client.closeOperation(operationHandle);
+ try {
+ client.fetchResults(operationHandle, FetchOrientation.FETCH_FIRST, 1000, FetchType.LOG);
+ Assert.fail("Fetch should fail");
+ } catch (HiveSQLException e) {
+ Assert.assertTrue(e.getMessage().contains("Invalid OperationHandle:"));
+ }
+
+ // check whether operation log file is deleted.
+ if (operationLogFile.exists()) {
+ Assert.fail("Operation log file should be deleted.");
+ }
+
+ // check whether session log dir is deleted after session is closed.
+ client.closeSession(sessionHandleCleanup);
+ if (sessionLogDir.exists()) {
+ Assert.fail("Session log dir should be deleted.");
+ }
+ }
+
+ private SessionHandle setupSession() throws Exception {
+ // Open a session
+ SessionHandle sessionHandle = client.openSession(null, null, null);
+
+ // Change lock manager to embedded mode
+ String queryString = "SET hive.lock.manager=" +
+ "org.apache.hadoop.hive.ql.lockmgr.EmbeddedLockManager";
+ client.executeStatement(sessionHandle, queryString, null);
+
+ // Drop the table if it exists
+ queryString = "DROP TABLE IF EXISTS " + tableName;
+ client.executeStatement(sessionHandle, queryString, null);
+
+ // Create a test table
+ queryString = "create table " + tableName + " (key int, value string)";
+ client.executeStatement(sessionHandle, queryString, null);
+
+ // Load data
+ queryString = "load data local inpath '" + dataFile + "' into table " + tableName;
+ client.executeStatement(sessionHandle, queryString, null);
+
+ // Precondition check: verify whether the table is created and data is fetched correctly.
+ OperationHandle operationHandle = client.executeStatement(sessionHandle, sql, null);
+ RowSet rowSetResult = client.fetchResults(operationHandle);
+ Assert.assertEquals(500, rowSetResult.numRows());
+ Assert.assertEquals(238, rowSetResult.iterator().next()[0]);
+ Assert.assertEquals("val_238", rowSetResult.iterator().next()[1]);
+
+ return sessionHandle;
+ }
+
+ // Since the log length of the sql operation may vary during HIVE dev, calculate a proper maxRows.
+ private int calculateProperMaxRows(int len) {
+ if (len < 10) {
+ return 1;
+ } else if (len < 100) {
+ return 10;
+ } else {
+ return 100;
+ }
+ }
+
+ private void verifyFetchedLog(RowSet rowSet) {
+ StringBuilder stringBuilder = new StringBuilder();
+
+ for (Object[] row : rowSet) {
+ stringBuilder.append(row[0]);
+ }
+
+ String logs = stringBuilder.toString();
+ verifyFetchedLog(logs);
+ }
+
+ private void verifyFetchedLog(String logs) {
+ for (String log : expectedLogs) {
+ Assert.assertTrue(logs.contains(log));
+ }
+ }
+}