You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/04/07 03:33:17 UTC
hive git commit: HIVE-13268 : Add a HA mini cluster type in MiniHS2
(Takanobu Asanuma, reviewed by Sergey Shelukhin)
Repository: hive
Updated Branches:
refs/heads/master 7420e3658 -> 4f9194d16
HIVE-13268 : Add a HA mini cluster type in MiniHS2 (Takanobu Asanuma, reviewed by Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/4f9194d1
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/4f9194d1
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/4f9194d1
Branch: refs/heads/master
Commit: 4f9194d1621d7f6574fa03100cf25bc927835ded
Parents: 7420e36
Author: Sergey Shelukhin <se...@apache.org>
Authored: Wed Apr 6 18:32:54 2016 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Wed Apr 6 18:32:54 2016 -0700
----------------------------------------------------------------------
.../org/apache/hive/jdbc/miniHS2/MiniHS2.java | 15 +-
.../apache/hive/jdbc/TestJdbcWithMiniHA.java | 200 +++++++++++++++++++
.../apache/hadoop/hive/shims/Hadoop23Shims.java | 34 +++-
.../apache/hadoop/hive/shims/HadoopShims.java | 6 +
4 files changed, 246 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/4f9194d1/itests/hive-unit/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java b/itests/hive-unit/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java
index 6b337d2..9ab5566 100644
--- a/itests/hive-unit/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java
+++ b/itests/hive-unit/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java
@@ -86,6 +86,7 @@ public class MiniHS2 extends AbstractHiveService {
private boolean isMetastoreRemote;
private boolean usePortsFromConf = false;
private String authType = "KERBEROS";
+ private boolean isHA = false;
public Builder() {
}
@@ -117,6 +118,11 @@ public class MiniHS2 extends AbstractHiveService {
return this;
}
+ public Builder withHA() {
+ this.isHA = true;
+ return this;
+ }
+
/**
* Start HS2 with HTTP transport mode, default is binary mode
* @return this Builder
@@ -137,7 +143,7 @@ public class MiniHS2 extends AbstractHiveService {
hiveConf.setVar(ConfVars.HIVE_SERVER2_TRANSPORT_MODE, HS2_BINARY_MODE);
}
return new MiniHS2(hiveConf, miniClusterType, useMiniKdc, serverPrincipal, serverKeytab,
- isMetastoreRemote, usePortsFromConf, authType);
+ isMetastoreRemote, usePortsFromConf, authType, isHA);
}
}
@@ -175,7 +181,7 @@ public class MiniHS2 extends AbstractHiveService {
private MiniHS2(HiveConf hiveConf, MiniClusterType miniClusterType, boolean useMiniKdc,
String serverPrincipal, String serverKeytab, boolean isMetastoreRemote,
- boolean usePortsFromConf, String authType) throws Exception {
+ boolean usePortsFromConf, String authType, boolean isHA) throws Exception {
super(hiveConf, "localhost",
(usePortsFromConf ? hiveConf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_PORT) : MetaStoreUtils.findFreePort()),
(usePortsFromConf ? hiveConf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT) : MetaStoreUtils.findFreePort()));
@@ -189,7 +195,7 @@ public class MiniHS2 extends AbstractHiveService {
if (miniClusterType != MiniClusterType.DFS_ONLY) {
// Initialize dfs
- dfs = ShimLoader.getHadoopShims().getMiniDfs(hiveConf, 4, true, null);
+ dfs = ShimLoader.getHadoopShims().getMiniDfs(hiveConf, 4, true, null, isHA);
fs = dfs.getFileSystem();
String uriString = WindowsPathUtil.getHdfsUriString(fs.getUri().toString());
@@ -266,7 +272,8 @@ public class MiniHS2 extends AbstractHiveService {
public MiniHS2(HiveConf hiveConf, MiniClusterType clusterType,
boolean usePortsFromConf) throws Exception {
- this(hiveConf, clusterType, false, null, null, false, usePortsFromConf, "KERBEROS");
+ this(hiveConf, clusterType, false, null, null, false, usePortsFromConf,
+ "KERBEROS", false);
}
public void start(Map<String, String> confOverlay) throws Exception {
http://git-wip-us.apache.org/repos/asf/hive/blob/4f9194d1/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHA.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHA.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHA.java
new file mode 100644
index 0000000..84644d1
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHA.java
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.jdbc;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.HAUtil;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hive.jdbc.miniHS2.MiniHS2;
+import org.apache.hive.service.cli.HiveSQLException;
+import org.apache.hive.service.cli.session.HiveSessionHook;
+import org.apache.hive.service.cli.session.HiveSessionHookContext;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * This class is cloned from TestJdbcWithMiniMR, except use Namenode HA.
+ */
+public class TestJdbcWithMiniHA {
+ public static final String TEST_TAG = "miniHS2.miniHA.tag";
+ public static final String TEST_TAG_VALUE = "miniHS2.miniHA.value";
+ public static class HATestSessionHook implements HiveSessionHook {
+ @Override
+ public void run(HiveSessionHookContext sessionHookContext) throws HiveSQLException {
+ sessionHookContext.getSessionConf().set(TEST_TAG, TEST_TAG_VALUE);
+ }
+ }
+
+ private static MiniHS2 miniHS2 = null;
+ private static HiveConf conf;
+ private static Path dataFilePath;
+ private static String dbName = "mrTestDb";
+ private Connection hs2Conn = null;
+ private Statement stmt;
+
+ @BeforeClass
+ public static void beforeTest() throws Exception {
+ Class.forName(MiniHS2.getJdbcDriverName());
+ conf = new HiveConf();
+ conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
+ String dataFileDir = conf.get("test.data.files").replace('\\', '/')
+ .replace("c:", "");
+ dataFilePath = new Path(dataFileDir, "kv1.txt");
+ DriverManager.setLoginTimeout(0);
+ conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
+ miniHS2 = new MiniHS2.Builder().withConf(conf).withMiniMR().withHA().build();
+ Map<String, String> overlayProps = new HashMap<String, String>();
+ overlayProps.put(ConfVars.HIVE_SERVER2_SESSION_HOOK.varname,
+ HATestSessionHook.class.getName());
+ miniHS2.start(overlayProps);
+ assertTrue(HAUtil.isHAEnabled(conf, DFSUtil.getNamenodeNameServiceId(conf)));
+ createDb();
+ }
+
+ // setup DB
+ private static void createDb() throws Exception {
+ Connection conn = DriverManager.
+ getConnection(miniHS2.getJdbcURL(), System.getProperty("user.name"), "bar");
+ Statement stmt2 = conn.createStatement();
+ stmt2.execute("DROP DATABASE IF EXISTS " + dbName + " CASCADE");
+ stmt2.execute("CREATE DATABASE " + dbName);
+ stmt2.close();
+ conn.close();
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ hs2Conn = DriverManager.getConnection(miniHS2.getJdbcURL(dbName),
+ System.getProperty("user.name"), "bar");
+ stmt = hs2Conn.createStatement();
+ stmt.execute("USE " + dbName);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (hs2Conn != null) {
+ hs2Conn.close();
+ }
+ }
+
+ @AfterClass
+ public static void afterTest() throws Exception {
+ if (miniHS2 != null && miniHS2.isStarted()) {
+ miniHS2.stop();
+ }
+ }
+
+ /**
+ * Verify that the connection to MiniHS2 is successful
+ * @throws Exception
+ */
+ @Test
+ public void testConnection() throws Exception {
+ // the session hook should set the property
+ verifyProperty(TEST_TAG, TEST_TAG_VALUE);
+ }
+
+ /**
+ * Run nonMr query
+ * @throws Exception
+ */
+ @Test
+ public void testNonMrQuery() throws Exception {
+ String tableName = "testTab1";
+ String resultVal = "val_238";
+ String queryStr = "SELECT * FROM " + tableName;
+
+ testKvQuery(tableName, queryStr, resultVal);
+ }
+
+ /**
+ * Run nonMr query
+ * @throws Exception
+ */
+ @Test
+ public void testMrQuery() throws Exception {
+ String tableName = "testTab2";
+ String resultVal = "val_238";
+ String queryStr = "SELECT * FROM " + tableName +
+ " where value = '" + resultVal + "'";
+
+ testKvQuery(tableName, queryStr, resultVal);
+ }
+
+ /**
+ * Verify if the given property contains the expected value
+ * @param propertyName
+ * @param expectedValue
+ * @throws Exception
+ */
+ private void verifyProperty(String propertyName, String expectedValue) throws Exception {
+ Statement stmt = hs2Conn .createStatement();
+ ResultSet res = stmt.executeQuery("set " + propertyName);
+ assertTrue(res.next());
+ String results[] = res.getString(1).split("=");
+ assertEquals("Property should be set", results.length, 2);
+ assertEquals("Property should be set", expectedValue, results[1]);
+ }
+
+ // create tables, verify query
+ private void testKvQuery(String tableName, String queryStr, String resultVal)
+ throws SQLException {
+ setupKv1Tabs(tableName);
+ verifyResult(queryStr, resultVal, 2);
+ stmt.execute("DROP TABLE " + tableName);
+ }
+
+ // create table and pupulate with kv1.txt
+ private void setupKv1Tabs(String tableName) throws SQLException {
+ Statement stmt = hs2Conn.createStatement();
+ // create table
+ stmt.execute("CREATE TABLE " + tableName
+ + " (under_col INT COMMENT 'the under column', value STRING)"
+ + " COMMENT ' test table'");
+
+ // load data
+ stmt.execute("load data local inpath '"
+ + dataFilePath.toString() + "' into table " + tableName);
+ }
+
+ // run given query and validate expecated result
+ private void verifyResult(String queryStr, String expString, int colPos)
+ throws SQLException {
+ ResultSet res = stmt.executeQuery(queryStr);
+ assertTrue(res.next());
+ assertEquals(expString, res.getString(colPos));
+ res.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/4f9194d1/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
----------------------------------------------------------------------
diff --git a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
index e028212..c21088f 100644
--- a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
+++ b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
@@ -64,6 +64,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.client.HdfsAdmin;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.EncryptionZone;
@@ -514,6 +515,13 @@ public class Hadoop23Shims extends HadoopShimsSecure {
}
}
+ @Override
+ public HadoopShims.MiniDFSShim getMiniDfs(Configuration conf,
+ int numDataNodes,
+ boolean format,
+ String[] racks) throws IOException{
+ return getMiniDfs(conf, numDataNodes, format, racks, false);
+ }
// Don't move this code to the parent class. There's a binary
// incompatibility between hadoop 1 and 2 wrt MiniDFSCluster and we
// need to have two different shim classes even though they are
@@ -522,16 +530,32 @@ public class Hadoop23Shims extends HadoopShimsSecure {
public HadoopShims.MiniDFSShim getMiniDfs(Configuration conf,
int numDataNodes,
boolean format,
- String[] racks) throws IOException {
+ String[] racks,
+ boolean isHA) throws IOException {
configureImpersonation(conf);
- MiniDFSCluster miniDFSCluster = new MiniDFSCluster(conf, numDataNodes, format, racks);
+ MiniDFSCluster miniDFSCluster;
+ if (isHA) {
+ MiniDFSNNTopology topo = new MiniDFSNNTopology()
+ .addNameservice(new MiniDFSNNTopology.NSConf("minidfs").addNN(
+ new MiniDFSNNTopology.NNConf("nn1")).addNN(
+ new MiniDFSNNTopology.NNConf("nn2")));
+ miniDFSCluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(numDataNodes).format(format)
+ .racks(racks).nnTopology(topo).build();
+ miniDFSCluster.waitActive();
+ miniDFSCluster.transitionToActive(0);
+ } else {
+ miniDFSCluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(numDataNodes).format(format)
+ .racks(racks).build();
+ }
// Need to set the client's KeyProvider to the NN's for JKS,
// else the updates do not get flushed properly
- KeyProviderCryptoExtension keyProvider = miniDFSCluster.getNameNode().getNamesystem().getProvider();
+ KeyProviderCryptoExtension keyProvider = miniDFSCluster.getNameNode(0).getNamesystem().getProvider();
if (keyProvider != null) {
try {
- setKeyProvider(miniDFSCluster.getFileSystem().getClient(), keyProvider);
+ setKeyProvider(miniDFSCluster.getFileSystem(0).getClient(), keyProvider);
} catch (Exception err) {
throw new IOException(err);
}
@@ -571,7 +595,7 @@ public class Hadoop23Shims extends HadoopShimsSecure {
@Override
public FileSystem getFileSystem() throws IOException {
- return cluster.getFileSystem();
+ return cluster.getFileSystem(0);
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/4f9194d1/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
----------------------------------------------------------------------
diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
index a44d0c0..4b9119b 100644
--- a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
+++ b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
@@ -113,6 +113,12 @@ public interface HadoopShims {
boolean format,
String[] racks) throws IOException;
+ MiniDFSShim getMiniDfs(Configuration conf,
+ int numDataNodes,
+ boolean format,
+ String[] racks,
+ boolean isHA) throws IOException;
+
/**
* Shim around the functions in MiniDFSCluster that Hive uses.
*/