You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/04/07 03:33:17 UTC

hive git commit: HIVE-13268 : Add a HA mini cluster type in MiniHS2 (Takanobu Asanuma, reviewed by Sergey Shelukhin)

Repository: hive
Updated Branches:
  refs/heads/master 7420e3658 -> 4f9194d16


HIVE-13268 : Add a HA mini cluster type in MiniHS2 (Takanobu Asanuma, reviewed by Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/4f9194d1
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/4f9194d1
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/4f9194d1

Branch: refs/heads/master
Commit: 4f9194d1621d7f6574fa03100cf25bc927835ded
Parents: 7420e36
Author: Sergey Shelukhin <se...@apache.org>
Authored: Wed Apr 6 18:32:54 2016 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Wed Apr 6 18:32:54 2016 -0700

----------------------------------------------------------------------
 .../org/apache/hive/jdbc/miniHS2/MiniHS2.java   |  15 +-
 .../apache/hive/jdbc/TestJdbcWithMiniHA.java    | 200 +++++++++++++++++++
 .../apache/hadoop/hive/shims/Hadoop23Shims.java |  34 +++-
 .../apache/hadoop/hive/shims/HadoopShims.java   |   6 +
 4 files changed, 246 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/4f9194d1/itests/hive-unit/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java b/itests/hive-unit/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java
index 6b337d2..9ab5566 100644
--- a/itests/hive-unit/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java
+++ b/itests/hive-unit/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java
@@ -86,6 +86,7 @@ public class MiniHS2 extends AbstractHiveService {
     private boolean isMetastoreRemote;
     private boolean usePortsFromConf = false;
     private String authType = "KERBEROS";
+    private boolean isHA = false;
 
     public Builder() {
     }
@@ -117,6 +118,11 @@ public class MiniHS2 extends AbstractHiveService {
       return this;
     }
 
+    public Builder withHA() {
+      this.isHA = true;
+      return this;
+    }
+
     /**
      * Start HS2 with HTTP transport mode, default is binary mode
      * @return this Builder
@@ -137,7 +143,7 @@ public class MiniHS2 extends AbstractHiveService {
         hiveConf.setVar(ConfVars.HIVE_SERVER2_TRANSPORT_MODE, HS2_BINARY_MODE);
       }
       return new MiniHS2(hiveConf, miniClusterType, useMiniKdc, serverPrincipal, serverKeytab,
-          isMetastoreRemote, usePortsFromConf, authType);
+          isMetastoreRemote, usePortsFromConf, authType, isHA);
     }
   }
 
@@ -175,7 +181,7 @@ public class MiniHS2 extends AbstractHiveService {
 
   private MiniHS2(HiveConf hiveConf, MiniClusterType miniClusterType, boolean useMiniKdc,
       String serverPrincipal, String serverKeytab, boolean isMetastoreRemote,
-      boolean usePortsFromConf, String authType) throws Exception {
+      boolean usePortsFromConf, String authType, boolean isHA) throws Exception {
     super(hiveConf, "localhost",
         (usePortsFromConf ? hiveConf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_PORT) : MetaStoreUtils.findFreePort()),
         (usePortsFromConf ? hiveConf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT) : MetaStoreUtils.findFreePort()));
@@ -189,7 +195,7 @@ public class MiniHS2 extends AbstractHiveService {
 
     if (miniClusterType != MiniClusterType.DFS_ONLY) {
       // Initialize dfs
-      dfs = ShimLoader.getHadoopShims().getMiniDfs(hiveConf, 4, true, null);
+      dfs = ShimLoader.getHadoopShims().getMiniDfs(hiveConf, 4, true, null, isHA);
       fs = dfs.getFileSystem();
       String uriString = WindowsPathUtil.getHdfsUriString(fs.getUri().toString());
 
@@ -266,7 +272,8 @@ public class MiniHS2 extends AbstractHiveService {
 
   public MiniHS2(HiveConf hiveConf, MiniClusterType clusterType,
       boolean usePortsFromConf) throws Exception {
-    this(hiveConf, clusterType, false, null, null, false, usePortsFromConf, "KERBEROS");
+    this(hiveConf, clusterType, false, null, null, false, usePortsFromConf,
+      "KERBEROS", false);
   }
 
   public void start(Map<String, String> confOverlay) throws Exception {

http://git-wip-us.apache.org/repos/asf/hive/blob/4f9194d1/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHA.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHA.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHA.java
new file mode 100644
index 0000000..84644d1
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHA.java
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.jdbc;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.HAUtil;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hive.jdbc.miniHS2.MiniHS2;
+import org.apache.hive.service.cli.HiveSQLException;
+import org.apache.hive.service.cli.session.HiveSessionHook;
+import org.apache.hive.service.cli.session.HiveSessionHookContext;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * This class is cloned from TestJdbcWithMiniMR, except use Namenode HA.
+ */
+public class TestJdbcWithMiniHA {
+  public static final String TEST_TAG = "miniHS2.miniHA.tag";
+  public static final String TEST_TAG_VALUE = "miniHS2.miniHA.value";
+  public static class HATestSessionHook implements HiveSessionHook {
+    @Override
+    public void run(HiveSessionHookContext sessionHookContext) throws HiveSQLException {
+      sessionHookContext.getSessionConf().set(TEST_TAG, TEST_TAG_VALUE);
+    }
+  }
+
+  private static MiniHS2 miniHS2 = null;
+  private static HiveConf conf;
+  private static Path dataFilePath;
+  private static String  dbName = "mrTestDb";
+  private Connection hs2Conn = null;
+  private Statement stmt;
+
+  @BeforeClass
+  public static void beforeTest() throws Exception {
+    Class.forName(MiniHS2.getJdbcDriverName());
+    conf = new HiveConf();
+    conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
+    String dataFileDir = conf.get("test.data.files").replace('\\', '/')
+        .replace("c:", "");
+    dataFilePath = new Path(dataFileDir, "kv1.txt");
+    DriverManager.setLoginTimeout(0);
+    conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
+    miniHS2 = new MiniHS2.Builder().withConf(conf).withMiniMR().withHA().build();
+    Map<String, String> overlayProps = new HashMap<String, String>();
+    overlayProps.put(ConfVars.HIVE_SERVER2_SESSION_HOOK.varname,
+        HATestSessionHook.class.getName());
+    miniHS2.start(overlayProps);
+    assertTrue(HAUtil.isHAEnabled(conf, DFSUtil.getNamenodeNameServiceId(conf)));
+    createDb();
+  }
+
+  // setup DB
+  private static void createDb() throws Exception {
+    Connection conn = DriverManager.
+        getConnection(miniHS2.getJdbcURL(), System.getProperty("user.name"), "bar");
+    Statement stmt2 = conn.createStatement();
+    stmt2.execute("DROP DATABASE IF EXISTS " + dbName + " CASCADE");
+    stmt2.execute("CREATE DATABASE " + dbName);
+    stmt2.close();
+    conn.close();
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    hs2Conn = DriverManager.getConnection(miniHS2.getJdbcURL(dbName),
+        System.getProperty("user.name"), "bar");
+    stmt = hs2Conn.createStatement();
+    stmt.execute("USE " + dbName);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (hs2Conn != null) {
+      hs2Conn.close();
+    }
+  }
+
+  @AfterClass
+  public static void afterTest() throws Exception {
+    if (miniHS2 != null && miniHS2.isStarted()) {
+      miniHS2.stop();
+    }
+  }
+
+  /**
+   * Verify that the connection to MiniHS2 is successful
+   * @throws Exception
+   */
+  @Test
+  public void testConnection() throws Exception {
+    // the session hook should set the property
+    verifyProperty(TEST_TAG, TEST_TAG_VALUE);
+  }
+
+  /**
+   * Run nonMr query
+   * @throws Exception
+   */
+  @Test
+  public void testNonMrQuery() throws Exception {
+    String tableName = "testTab1";
+    String resultVal = "val_238";
+    String queryStr = "SELECT * FROM " + tableName;
+
+    testKvQuery(tableName, queryStr, resultVal);
+  }
+
+  /**
+   * Run nonMr query
+   * @throws Exception
+   */
+  @Test
+  public void testMrQuery() throws Exception {
+    String tableName = "testTab2";
+    String resultVal = "val_238";
+    String queryStr = "SELECT * FROM " + tableName +
+        " where value = '" + resultVal + "'";
+
+    testKvQuery(tableName, queryStr, resultVal);
+  }
+
+  /**
+   * Verify if the given property contains the expected value
+   * @param propertyName
+   * @param expectedValue
+   * @throws Exception
+   */
+  private void verifyProperty(String propertyName, String expectedValue) throws Exception {
+    Statement stmt = hs2Conn .createStatement();
+    ResultSet res = stmt.executeQuery("set " + propertyName);
+    assertTrue(res.next());
+    String results[] = res.getString(1).split("=");
+    assertEquals("Property should be set", results.length, 2);
+    assertEquals("Property should be set", expectedValue, results[1]);
+  }
+
+  // create tables, verify query
+  private void testKvQuery(String tableName, String queryStr, String resultVal)
+      throws SQLException {
+    setupKv1Tabs(tableName);
+    verifyResult(queryStr, resultVal, 2);
+    stmt.execute("DROP TABLE " + tableName);
+  }
+
+  // create table and pupulate with kv1.txt
+  private void setupKv1Tabs(String tableName) throws SQLException {
+    Statement stmt = hs2Conn.createStatement();
+    // create table
+    stmt.execute("CREATE TABLE " + tableName
+        + " (under_col INT COMMENT 'the under column', value STRING)"
+        + " COMMENT ' test table'");
+
+    // load data
+    stmt.execute("load data local inpath '"
+        + dataFilePath.toString() + "' into table " + tableName);
+  }
+
+  // run given query and validate expecated result
+  private void verifyResult(String queryStr, String expString, int colPos)
+      throws SQLException {
+    ResultSet res = stmt.executeQuery(queryStr);
+    assertTrue(res.next());
+    assertEquals(expString, res.getString(colPos));
+    res.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/4f9194d1/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
----------------------------------------------------------------------
diff --git a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
index e028212..c21088f 100644
--- a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
+++ b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
@@ -64,6 +64,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.MiniDFSNNTopology;
 import org.apache.hadoop.hdfs.client.HdfsAdmin;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
 import org.apache.hadoop.hdfs.protocol.EncryptionZone;
@@ -514,6 +515,13 @@ public class Hadoop23Shims extends HadoopShimsSecure {
     }
   }
 
+  @Override
+  public HadoopShims.MiniDFSShim getMiniDfs(Configuration conf,
+      int numDataNodes,
+      boolean format,
+      String[] racks) throws IOException{
+    return getMiniDfs(conf, numDataNodes, format, racks, false);
+  }
   // Don't move this code to the parent class. There's a binary
   // incompatibility between hadoop 1 and 2 wrt MiniDFSCluster and we
   // need to have two different shim classes even though they are
@@ -522,16 +530,32 @@ public class Hadoop23Shims extends HadoopShimsSecure {
   public HadoopShims.MiniDFSShim getMiniDfs(Configuration conf,
       int numDataNodes,
       boolean format,
-      String[] racks) throws IOException {
+      String[] racks,
+      boolean isHA) throws IOException {
     configureImpersonation(conf);
-    MiniDFSCluster miniDFSCluster = new MiniDFSCluster(conf, numDataNodes, format, racks);
+    MiniDFSCluster miniDFSCluster;
+    if (isHA) {
+      MiniDFSNNTopology topo = new MiniDFSNNTopology()
+        .addNameservice(new MiniDFSNNTopology.NSConf("minidfs").addNN(
+          new MiniDFSNNTopology.NNConf("nn1")).addNN(
+          new MiniDFSNNTopology.NNConf("nn2")));
+      miniDFSCluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(numDataNodes).format(format)
+        .racks(racks).nnTopology(topo).build();
+      miniDFSCluster.waitActive();
+      miniDFSCluster.transitionToActive(0);
+    } else {
+      miniDFSCluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(numDataNodes).format(format)
+        .racks(racks).build();
+    }
 
     // Need to set the client's KeyProvider to the NN's for JKS,
     // else the updates do not get flushed properly
-    KeyProviderCryptoExtension keyProvider =  miniDFSCluster.getNameNode().getNamesystem().getProvider();
+    KeyProviderCryptoExtension keyProvider =  miniDFSCluster.getNameNode(0).getNamesystem().getProvider();
     if (keyProvider != null) {
       try {
-        setKeyProvider(miniDFSCluster.getFileSystem().getClient(), keyProvider);
+        setKeyProvider(miniDFSCluster.getFileSystem(0).getClient(), keyProvider);
       } catch (Exception err) {
         throw new IOException(err);
       }
@@ -571,7 +595,7 @@ public class Hadoop23Shims extends HadoopShimsSecure {
 
     @Override
     public FileSystem getFileSystem() throws IOException {
-      return cluster.getFileSystem();
+      return cluster.getFileSystem(0);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/4f9194d1/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
----------------------------------------------------------------------
diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
index a44d0c0..4b9119b 100644
--- a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
+++ b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
@@ -113,6 +113,12 @@ public interface HadoopShims {
       boolean format,
       String[] racks) throws IOException;
 
+  MiniDFSShim getMiniDfs(Configuration conf,
+      int numDataNodes,
+      boolean format,
+      String[] racks,
+      boolean isHA) throws IOException;
+
   /**
    * Shim around the functions in MiniDFSCluster that Hive uses.
    */