You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2014/12/18 19:21:34 UTC
svn commit: r1646504 -
/hive/branches/spark/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestMultiSessionsHS2WithLocalClusterSpark.java
Author: xuefu
Date: Thu Dec 18 18:21:33 2014
New Revision: 1646504
URL: http://svn.apache.org/r1646504
Log:
HIVE-9116: Add unit test for multi sessions.[Spark Branch] (Chengxiang via Xuefu)
Added:
hive/branches/spark/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestMultiSessionsHS2WithLocalClusterSpark.java
Added: hive/branches/spark/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestMultiSessionsHS2WithLocalClusterSpark.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestMultiSessionsHS2WithLocalClusterSpark.java?rev=1646504&view=auto
==============================================================================
--- hive/branches/spark/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestMultiSessionsHS2WithLocalClusterSpark.java (added)
+++ hive/branches/spark/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestMultiSessionsHS2WithLocalClusterSpark.java Thu Dec 18 18:21:33 2014
@@ -0,0 +1,249 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.jdbc;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hive.jdbc.miniHS2.MiniHS2;
+import org.apache.hive.service.cli.HiveSQLException;
+import org.apache.hive.service.cli.session.HiveSessionHook;
+import org.apache.hive.service.cli.session.HiveSessionHookContext;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestMultiSessionsHS2WithLocalClusterSpark {
+ public static final String TEST_TAG = "miniHS2.localClusterSpark.tag";
+ public static final String TEST_TAG_VALUE = "miniHS2.localClusterSpark.value";
+ private static final int PARALLEL_NUMBER = 3;
+
+ public static class LocalClusterSparkSessionHook implements HiveSessionHook {
+ @Override
+ public void run(HiveSessionHookContext sessionHookContext) throws HiveSQLException {
+ sessionHookContext.getSessionConf().set(TEST_TAG, TEST_TAG_VALUE);
+ }
+ }
+
+ private static MiniHS2 miniHS2 = null;
+ private static HiveConf conf;
+ private static Path dataFilePath;
+ private static String dbName = "sparkTestDb";
+ private ThreadLocal<Connection> localConnection = new ThreadLocal<Connection>();
+ private ThreadLocal<Statement> localStatement = new ThreadLocal<Statement>();
+ private ExecutorService pool = null;
+
+
+ private static HiveConf createHiveConf() {
+ HiveConf conf = new HiveConf();
+ conf.set("hive.exec.parallel", "true");
+ conf.set("hive.execution.engine", "spark");
+ conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
+ conf.set("spark.master", "local-cluster[2,2,1024]");
+ conf.set("spark.deploy.defaultCores", "2");
+ return conf;
+ }
+
+ @BeforeClass
+ public static void beforeTest() throws Exception {
+ Class.forName(MiniHS2.getJdbcDriverName());
+ conf = createHiveConf();
+ conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
+ String dataFileDir = conf.get("test.data.files").replace('\\', '/')
+ .replace("c:", "");
+ dataFilePath = new Path(dataFileDir, "kv1.txt");
+ DriverManager.setLoginTimeout(0);
+ conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
+ miniHS2 = new MiniHS2(conf, true);
+ Map<String, String> overlayProps = new HashMap<String, String>();
+ overlayProps.put(ConfVars.HIVE_SERVER2_SESSION_HOOK.varname,
+ LocalClusterSparkSessionHook.class.getName());
+ miniHS2.start(overlayProps);
+ createDb();
+ }
+
+ // setup DB
+ private static void createDb() throws SQLException {
+ Connection conn = DriverManager.
+ getConnection(miniHS2.getJdbcURL(), System.getProperty("user.name"), "bar");
+ Statement stmt2 = conn.createStatement();
+ stmt2.execute("DROP DATABASE IF EXISTS " + dbName + " CASCADE");
+ stmt2.execute("CREATE DATABASE " + dbName);
+ stmt2.close();
+ conn.close();
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ pool = Executors.newFixedThreadPool(PARALLEL_NUMBER,
+ new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Test-Thread-%d").build());
+ createConnection();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ pool.shutdownNow();
+ closeConnection();
+ }
+
+ private void createConnection() throws SQLException {
+ Connection connection = DriverManager.getConnection(miniHS2.getJdbcURL(dbName),
+ System.getProperty("user.name"), "bar");
+ Statement statement = connection.createStatement();
+ localConnection.set(connection);
+ localStatement.set(statement);
+ statement.execute("USE " + dbName);
+ }
+
+ private void closeConnection() throws SQLException {
+ if (localStatement.get() != null) {
+ localStatement.get().close();
+ }
+
+ if (localConnection.get() != null) {
+ localConnection.get().close();
+ }
+ }
+
+ @AfterClass
+ public static void afterTest() throws Exception {
+ if (miniHS2 != null && miniHS2.isStarted()) {
+ miniHS2.stop();
+ }
+ }
+
+ /**
+ * Run nonSpark query
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testNonSparkQuery() throws Exception {
+ String tableName = "kvTable1";
+ setupTable(tableName);
+ Callable<Void> runNonSparkQuery = getNonSparkQueryCallable(tableName);
+ runInParallel(runNonSparkQuery);
+ dropTable(tableName);
+ }
+
+ /**
+ * Run spark query
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testSparkQuery() throws Exception {
+ String tableName = "kvTable2";
+ setupTable(tableName);
+ Callable<Void> runSparkQuery = getSparkQueryCallable(tableName);
+ runInParallel(runSparkQuery);
+ dropTable(tableName);
+ }
+
+ private void runInParallel(Callable<Void> runNonSparkQuery) throws InterruptedException, ExecutionException {
+ List<Future> futureList = new LinkedList<Future>();
+ for (int i = 0; i < PARALLEL_NUMBER; i++) {
+ Future future = pool.submit(runNonSparkQuery);
+ futureList.add(future);
+ }
+
+ for (Future future : futureList) {
+ future.get();
+ }
+ }
+
+ private Callable<Void> getNonSparkQueryCallable(final String tableName) {
+ return new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ String resultVal = "val_238";
+ String queryStr = "SELECT * FROM " + tableName;
+ testKvQuery(queryStr, resultVal);
+ return null;
+ }
+ };
+ }
+
+ private Callable<Void> getSparkQueryCallable(final String tableName) {
+ return new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ String resultVal = "val_238";
+ String queryStr = "SELECT * FROM " + tableName +
+ " where value = '" + resultVal + "'";
+ testKvQuery(queryStr, resultVal);
+ return null;
+ }
+ };
+ }
+
+ private void testKvQuery(String queryStr, String resultVal)
+ throws SQLException {
+ createConnection();
+ verifyResult(queryStr, resultVal, 2);
+ closeConnection();
+ }
+
+ // create table and load kv1.txt
+ private void setupTable(String tableName) throws SQLException {
+ Statement statement = localStatement.get();
+ // create table
+ statement.execute("CREATE TABLE " + tableName
+ + " (under_col INT COMMENT 'the under column', value STRING)"
+ + " COMMENT ' test table'");
+
+ // load data
+ statement.execute("LOAD DATA LOCAL INPATH '"
+ + dataFilePath.toString() + "' INTO TABLE " + tableName);
+ }
+
+ private void dropTable(String tableName) throws SQLException {
+ localStatement.get().execute("DROP TABLE " + tableName);
+ }
+
+ // run given query and validate expected result
+ private void verifyResult(String queryStr, String expString, int colPos)
+ throws SQLException {
+ ResultSet res = localStatement.get().executeQuery(queryStr);
+ assertTrue(res.next());
+ assertEquals(expString, res.getString(colPos));
+ res.close();
+ }
+}