You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2014/12/18 19:21:34 UTC

svn commit: r1646504 - /hive/branches/spark/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestMultiSessionsHS2WithLocalClusterSpark.java

Author: xuefu
Date: Thu Dec 18 18:21:33 2014
New Revision: 1646504

URL: http://svn.apache.org/r1646504
Log:
HIVE-9116: Add unit test for multi sessions.[Spark Branch] (Chengxiang via Xuefu)

Added:
    hive/branches/spark/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestMultiSessionsHS2WithLocalClusterSpark.java

Added: hive/branches/spark/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestMultiSessionsHS2WithLocalClusterSpark.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestMultiSessionsHS2WithLocalClusterSpark.java?rev=1646504&view=auto
==============================================================================
--- hive/branches/spark/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestMultiSessionsHS2WithLocalClusterSpark.java (added)
+++ hive/branches/spark/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestMultiSessionsHS2WithLocalClusterSpark.java Thu Dec 18 18:21:33 2014
@@ -0,0 +1,249 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.jdbc;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hive.jdbc.miniHS2.MiniHS2;
+import org.apache.hive.service.cli.HiveSQLException;
+import org.apache.hive.service.cli.session.HiveSessionHook;
+import org.apache.hive.service.cli.session.HiveSessionHookContext;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestMultiSessionsHS2WithLocalClusterSpark {
+  public static final String TEST_TAG = "miniHS2.localClusterSpark.tag";
+  public static final String TEST_TAG_VALUE = "miniHS2.localClusterSpark.value";
+  private static final int PARALLEL_NUMBER = 3;
+
+  public static class LocalClusterSparkSessionHook implements HiveSessionHook {
+    @Override
+    public void run(HiveSessionHookContext sessionHookContext) throws HiveSQLException {
+      sessionHookContext.getSessionConf().set(TEST_TAG, TEST_TAG_VALUE);
+    }
+  }
+
+  private static MiniHS2 miniHS2 = null;
+  private static HiveConf conf;
+  private static Path dataFilePath;
+  private static String dbName = "sparkTestDb";
+  private ThreadLocal<Connection> localConnection = new ThreadLocal<Connection>();
+  private ThreadLocal<Statement> localStatement = new ThreadLocal<Statement>();
+  private ExecutorService pool = null;
+
+
+  private static HiveConf createHiveConf() {
+    HiveConf conf = new HiveConf();
+    conf.set("hive.exec.parallel", "true");
+    conf.set("hive.execution.engine", "spark");
+    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
+    conf.set("spark.master", "local-cluster[2,2,1024]");
+    conf.set("spark.deploy.defaultCores", "2");
+    return conf;
+  }
+
+  @BeforeClass
+  public static void beforeTest() throws Exception {
+    Class.forName(MiniHS2.getJdbcDriverName());
+    conf = createHiveConf();
+    conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
+    String dataFileDir = conf.get("test.data.files").replace('\\', '/')
+      .replace("c:", "");
+    dataFilePath = new Path(dataFileDir, "kv1.txt");
+    DriverManager.setLoginTimeout(0);
+    conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
+    miniHS2 = new MiniHS2(conf, true);
+    Map<String, String> overlayProps = new HashMap<String, String>();
+    overlayProps.put(ConfVars.HIVE_SERVER2_SESSION_HOOK.varname,
+      LocalClusterSparkSessionHook.class.getName());
+    miniHS2.start(overlayProps);
+    createDb();
+  }
+
+  // setup DB
+  private static void createDb() throws SQLException {
+    Connection conn = DriverManager.
+      getConnection(miniHS2.getJdbcURL(), System.getProperty("user.name"), "bar");
+    Statement stmt2 = conn.createStatement();
+    stmt2.execute("DROP DATABASE IF EXISTS " + dbName + " CASCADE");
+    stmt2.execute("CREATE DATABASE " + dbName);
+    stmt2.close();
+    conn.close();
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    pool = Executors.newFixedThreadPool(PARALLEL_NUMBER,
+      new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Test-Thread-%d").build());
+    createConnection();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    pool.shutdownNow();
+    closeConnection();
+  }
+
+  private void createConnection() throws SQLException {
+    Connection connection = DriverManager.getConnection(miniHS2.getJdbcURL(dbName),
+      System.getProperty("user.name"), "bar");
+    Statement statement = connection.createStatement();
+    localConnection.set(connection);
+    localStatement.set(statement);
+    statement.execute("USE " + dbName);
+  }
+
+  private void closeConnection() throws SQLException {
+    if (localStatement.get() != null) {
+      localStatement.get().close();
+    }
+
+    if (localConnection.get() != null) {
+      localConnection.get().close();
+    }
+  }
+
+  @AfterClass
+  public static void afterTest() throws Exception {
+    if (miniHS2 != null && miniHS2.isStarted()) {
+      miniHS2.stop();
+    }
+  }
+
+  /**
+   * Run nonSpark query
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testNonSparkQuery() throws Exception {
+    String tableName = "kvTable1";
+    setupTable(tableName);
+    Callable<Void> runNonSparkQuery = getNonSparkQueryCallable(tableName);
+    runInParallel(runNonSparkQuery);
+    dropTable(tableName);
+  }
+
+  /**
+   * Run spark query
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testSparkQuery() throws Exception {
+    String tableName = "kvTable2";
+    setupTable(tableName);
+    Callable<Void> runSparkQuery = getSparkQueryCallable(tableName);
+    runInParallel(runSparkQuery);
+    dropTable(tableName);
+  }
+
+  private void runInParallel(Callable<Void> runNonSparkQuery) throws InterruptedException, ExecutionException {
+    List<Future> futureList = new LinkedList<Future>();
+    for (int i = 0; i < PARALLEL_NUMBER; i++) {
+      Future future = pool.submit(runNonSparkQuery);
+      futureList.add(future);
+    }
+
+    for (Future future : futureList) {
+      future.get();
+    }
+  }
+
+  private Callable<Void> getNonSparkQueryCallable(final String tableName) {
+    return new Callable<Void>() {
+      @Override
+      public Void call() throws Exception {
+        String resultVal = "val_238";
+        String queryStr = "SELECT * FROM " + tableName;
+        testKvQuery(queryStr, resultVal);
+        return null;
+      }
+    };
+  }
+
+  private Callable<Void> getSparkQueryCallable(final String tableName) {
+    return new Callable<Void>() {
+      @Override
+      public Void call() throws Exception {
+        String resultVal = "val_238";
+        String queryStr = "SELECT * FROM " + tableName +
+          " where value = '" + resultVal + "'";
+        testKvQuery(queryStr, resultVal);
+        return null;
+      }
+    };
+  }
+
+  private void testKvQuery(String queryStr, String resultVal)
+    throws SQLException {
+    createConnection();
+    verifyResult(queryStr, resultVal, 2);
+    closeConnection();
+  }
+
+  // create table and load kv1.txt
+  private void setupTable(String tableName) throws SQLException {
+    Statement statement = localStatement.get();
+    // create table
+    statement.execute("CREATE TABLE " + tableName
+      + " (under_col INT COMMENT 'the under column', value STRING)"
+      + " COMMENT ' test table'");
+
+    // load data
+    statement.execute("LOAD DATA LOCAL INPATH '"
+      + dataFilePath.toString() + "' INTO TABLE " + tableName);
+  }
+
+  private void dropTable(String tableName) throws SQLException {
+    localStatement.get().execute("DROP TABLE " + tableName);
+  }
+
+  // run given query and validate expected result
+  private void verifyResult(String queryStr, String expString, int colPos)
+    throws SQLException {
+    ResultSet res = localStatement.get().executeQuery(queryStr);
+    assertTrue(res.next());
+    assertEquals(expString, res.getString(colPos));
+    res.close();
+  }
+}