You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2018/03/16 06:01:41 UTC
[2/2] hive git commit: HIVE-18281: HiveServer2 HA for LLAP and
Workload Manager (Prasanth Jayachandran reviewed by Sergey Shelukhin)
HIVE-18281: HiveServer2 HA for LLAP and Workload Manager (Prasanth Jayachandran reviewed by Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/21c6a540
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/21c6a540
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/21c6a540
Branch: refs/heads/master
Commit: 21c6a5407cebc5a096cd9aa10157be05a3ea9627
Parents: 3004335
Author: Prasanth Jayachandran <pr...@apache.org>
Authored: Thu Mar 15 23:01:25 2018 -0700
Committer: Prasanth Jayachandran <pr...@apache.org>
Committed: Thu Mar 15 23:01:25 2018 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 11 +
itests/hive-minikdc/pom.xml | 20 ++
itests/hive-unit-hadoop2/pom.xml | 21 +-
.../apache/hive/jdbc/TestActivePassiveHA.java | 268 +++++++++++++++
.../hive/jdbc/miniHS2/AbstractHiveService.java | 8 +-
.../org/apache/hive/jdbc/miniHS2/MiniHS2.java | 7 +
.../hive/llap/registry/ServiceRegistry.java | 11 +-
.../registry/impl/LlapFixedRegistryImpl.java | 4 +-
.../llap/registry/impl/LlapRegistryService.java | 5 +-
.../impl/LlapZookeeperRegistryImpl.java | 12 +-
.../hive/llap/security/LlapTokenClient.java | 3 +-
.../hadoop/hive/registry/RegistryUtilities.java | 52 +++
.../hadoop/hive/registry/ServiceInstance.java | 18 +-
.../hive/registry/ServiceInstanceSet.java | 12 +-
.../hive/registry/impl/ServiceInstanceBase.java | 57 +++-
.../hive/registry/impl/TezAmInstance.java | 21 +-
.../hive/registry/impl/TezAmRegistryImpl.java | 8 +-
.../hive/registry/impl/ZkRegistryBase.java | 242 +++++++++-----
.../hadoop/hive/llap/LlapBaseInputFormat.java | 5 +-
.../daemon/services/impl/LlapWebServices.java | 5 +-
.../tezplugins/LlapTaskSchedulerService.java | 10 +
.../hive/ql/exec/tez/TezSessionPoolManager.java | 9 +-
.../hive/ql/exec/tez/TezSessionPoolSession.java | 11 +-
.../hive/ql/exec/tez/TezSessionState.java | 5 +-
.../apache/hadoop/hive/ql/exec/tez/Utils.java | 3 +-
.../physical/LlapClusterStateForCompile.java | 3 +-
.../hive/ql/exec/tez/TestTezSessionPool.java | 13 +-
service/pom.xml | 43 ++-
.../server/HS2ActivePassiveHARegistry.java | 325 +++++++++++++++++++
.../HS2ActivePassiveHARegistryClient.java | 54 +++
.../apache/hive/service/server/HiveServer2.java | 313 ++++++++++++------
.../server/HiveServer2HAInstanceSet.java | 29 ++
.../service/server/HiveServer2Instance.java | 108 ++++++
.../service/servlet/HS2LeadershipStatus.java | 48 +++
.../apache/hive/service/servlet/HS2Peers.java | 75 +++++
35 files changed, 1559 insertions(+), 280 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 8bbf1be..06efd02 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1916,6 +1916,10 @@ public class HiveConf extends Configuration {
new TimeValidator(TimeUnit.MILLISECONDS),
"ZooKeeper client's session timeout (in milliseconds). The client is disconnected, and as a result, all locks released, \n" +
"if a heartbeat is not sent in the timeout."),
+ HIVE_ZOOKEEPER_CONNECTION_TIMEOUT("hive.zookeeper.connection.timeout", "15s",
+ new TimeValidator(TimeUnit.SECONDS),
+ "ZooKeeper client's connection timeout in seconds. Connection timeout * hive.zookeeper.connection.max.retries\n" +
+ "with exponential backoff is when curator client deems connection is lost to zookeeper."),
HIVE_ZOOKEEPER_NAMESPACE("hive.zookeeper.namespace", "hive_zookeeper_namespace",
"The parent node under which all ZooKeeper nodes are created."),
HIVE_ZOOKEEPER_CLEAN_EXTRA_NODES("hive.zookeeper.clean.extra.nodes", false,
@@ -2465,6 +2469,13 @@ public class HiveConf extends Configuration {
"If true, the HiveServer2 WebUI will be secured with PAM."),
// Tez session settings
+ HIVE_SERVER2_ACTIVE_PASSIVE_HA_ENABLE("hive.server2.active.passive.ha.enable", false,
+ "Whether HiveServer2 Active/Passive High Availability be enabled when Hive Interactive sessions are enabled." +
+ "This will also require hive.server2.support.dynamic.service.discovery to be enabled."),
+ HIVE_SERVER2_ACTIVE_PASSIVE_HA_REGISTRY_NAMESPACE("hive.server2.active.passive.ha.registry.namespace",
+ "hs2ActivePassiveHA",
+ "When HiveServer2 Active/Passive High Availability is enabled, uses this namespace for registering HS2\n" +
+ "instances with zookeeper"),
HIVE_SERVER2_TEZ_INTERACTIVE_QUEUE("hive.server2.tez.interactive.queue", "",
"A single YARN queues to use for Hive Interactive sessions. When this is specified,\n" +
"workload management is enabled and used for these sessions."),
http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/itests/hive-minikdc/pom.xml
----------------------------------------------------------------------
diff --git a/itests/hive-minikdc/pom.xml b/itests/hive-minikdc/pom.xml
index 337535a..1e40d9d 100644
--- a/itests/hive-minikdc/pom.xml
+++ b/itests/hive-minikdc/pom.xml
@@ -117,6 +117,26 @@
<scope>test</scope>
<classifier>tests</classifier>
</dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-dag</artifactId>
+ <version>${tez.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commmons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-server-web-proxy</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
<!-- test inter-project -->
<dependency>
<groupId>junit</groupId>
http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/itests/hive-unit-hadoop2/pom.xml
----------------------------------------------------------------------
diff --git a/itests/hive-unit-hadoop2/pom.xml b/itests/hive-unit-hadoop2/pom.xml
index fb31fd4..85a6145 100644
--- a/itests/hive-unit-hadoop2/pom.xml
+++ b/itests/hive-unit-hadoop2/pom.xml
@@ -53,7 +53,26 @@
<artifactId>hive-exec</artifactId>
<version>${project.version}</version>
</dependency>
-
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-dag</artifactId>
+ <version>${tez.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commmons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-server-web-proxy</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
<!-- dependencies are always listed in sorted order by groupId, artifectId -->
<!-- test intra-project -->
<dependency>
http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestActivePassiveHA.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestActivePassiveHA.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestActivePassiveHA.java
new file mode 100644
index 0000000..26acbd7
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestActivePassiveHA.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.jdbc;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.sql.Connection;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.curator.test.TestingServer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.registry.impl.ZkRegistryBase;
+import org.apache.hive.jdbc.miniHS2.MiniHS2;
+import org.apache.hive.service.server.HS2ActivePassiveHARegistry;
+import org.apache.hive.service.server.HS2ActivePassiveHARegistryClient;
+import org.apache.hive.service.server.HiveServer2Instance;
+import org.apache.hive.service.servlet.HS2Peers;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestActivePassiveHA {
+ private MiniHS2 miniHS2_1 = null;
+ private MiniHS2 miniHS2_2 = null;
+ private static TestingServer zkServer;
+ private Connection hs2Conn = null;
+ private HiveConf hiveConf1;
+ private HiveConf hiveConf2;
+
+ @BeforeClass
+ public static void beforeTest() throws Exception {
+ MiniHS2.cleanupLocalDir();
+ zkServer = new TestingServer();
+ Class.forName(MiniHS2.getJdbcDriverName());
+ }
+
+ @AfterClass
+ public static void afterTest() throws Exception {
+ if (zkServer != null) {
+ zkServer.close();
+ zkServer = null;
+ }
+ MiniHS2.cleanupLocalDir();
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ hiveConf1 = new HiveConf();
+ hiveConf1.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
+ // Set up zookeeper dynamic service discovery configs
+ setHAConfigs(hiveConf1);
+ miniHS2_1 = new MiniHS2.Builder().withConf(hiveConf1).cleanupLocalDirOnStartup(false).build();
+ hiveConf2 = new HiveConf();
+ hiveConf2.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
+ // Set up zookeeper dynamic service discovery configs
+ setHAConfigs(hiveConf2);
+ miniHS2_2 = new MiniHS2.Builder().withConf(hiveConf2).cleanupLocalDirOnStartup(false).build();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (hs2Conn != null) {
+ hs2Conn.close();
+ }
+ if ((miniHS2_1 != null) && miniHS2_1.isStarted()) {
+ miniHS2_1.stop();
+ }
+ if ((miniHS2_2 != null) && miniHS2_2.isStarted()) {
+ miniHS2_2.stop();
+ }
+ }
+
+ private static void setHAConfigs(Configuration conf) {
+ conf.setBoolean(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY.varname, true);
+ conf.set(ConfVars.HIVE_ZOOKEEPER_QUORUM.varname, zkServer.getConnectString());
+ final String zkRootNamespace = "hs2test";
+ conf.set(ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE.varname, zkRootNamespace);
+ conf.setBoolean(ConfVars.HIVE_SERVER2_ACTIVE_PASSIVE_HA_ENABLE.varname, true);
+ conf.setTimeDuration(ConfVars.HIVE_ZOOKEEPER_CONNECTION_TIMEOUT.varname, 2, TimeUnit.SECONDS);
+ conf.setTimeDuration(ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME.varname, 100, TimeUnit.MILLISECONDS);
+ conf.setInt(ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES.varname, 1);
+ }
+
+ @Test(timeout = 60000)
+ public void testActivePassive() throws Exception {
+ Map<String, String> confOverlay = new HashMap<>();
+ hiveConf1.set(ZkRegistryBase.UNIQUE_IDENTIFIER, UUID.randomUUID().toString());
+ miniHS2_1.start(confOverlay);
+ while(!miniHS2_1.isStarted()) {
+ Thread.sleep(100);
+ }
+
+ hiveConf2.set(ZkRegistryBase.UNIQUE_IDENTIFIER, UUID.randomUUID().toString());
+ miniHS2_2.start(confOverlay);
+ while(!miniHS2_2.isStarted()) {
+ Thread.sleep(100);
+ }
+
+ assertEquals(true, miniHS2_1.isLeader());
+ String url = "http://localhost:" + hiveConf1.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/leader";
+ assertEquals("true", sendGet(url));
+
+ assertEquals(false, miniHS2_2.isLeader());
+ url = "http://localhost:" + hiveConf2.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/leader";
+ assertEquals("false", sendGet(url));
+
+ url = "http://localhost:" + hiveConf1.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/peers";
+ String resp = sendGet(url);
+ ObjectMapper objectMapper = new ObjectMapper();
+ HS2Peers.HS2Instances hs2Peers = objectMapper.readValue(resp, HS2Peers.HS2Instances.class);
+ int port1 = Integer.parseInt(hiveConf1.get(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname));
+ assertEquals(2, hs2Peers.getHiveServer2Instances().size());
+ for (HiveServer2Instance hsi : hs2Peers.getHiveServer2Instances()) {
+ if (hsi.getRpcPort() == port1) {
+ assertEquals(true, hsi.isLeader());
+ } else {
+ assertEquals(false, hsi.isLeader());
+ }
+ }
+
+ Configuration conf = new Configuration();
+ setHAConfigs(conf);
+ HS2ActivePassiveHARegistry client = HS2ActivePassiveHARegistryClient.getClient(conf);
+ List<HiveServer2Instance> hs2Instances = new ArrayList<>(client.getAll());
+ assertEquals(2, hs2Instances.size());
+ List<HiveServer2Instance> leaders = new ArrayList<>();
+ List<HiveServer2Instance> standby = new ArrayList<>();
+ for (HiveServer2Instance instance : hs2Instances) {
+ if (instance.isLeader()) {
+ leaders.add(instance);
+ } else {
+ standby.add(instance);
+ }
+ }
+ assertEquals(1, leaders.size());
+ assertEquals(1, standby.size());
+
+ miniHS2_1.stop();
+
+ while(!miniHS2_2.isStarted()) {
+ Thread.sleep(100);
+ }
+ assertEquals(true, miniHS2_2.isLeader());
+ url = "http://localhost:" + hiveConf2.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/leader";
+ assertEquals("true", sendGet(url));
+
+ while (client.getAll().size() != 1) {
+ Thread.sleep(100);
+ }
+
+ client = HS2ActivePassiveHARegistryClient.getClient(conf);
+ hs2Instances = new ArrayList<>(client.getAll());
+ assertEquals(1, hs2Instances.size());
+ leaders = new ArrayList<>();
+ standby = new ArrayList<>();
+ for (HiveServer2Instance instance : hs2Instances) {
+ if (instance.isLeader()) {
+ leaders.add(instance);
+ } else {
+ standby.add(instance);
+ }
+ }
+ assertEquals(1, leaders.size());
+ assertEquals(0, standby.size());
+
+ url = "http://localhost:" + hiveConf2.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/peers";
+ resp = sendGet(url);
+ objectMapper = new ObjectMapper();
+ hs2Peers = objectMapper.readValue(resp, HS2Peers.HS2Instances.class);
+ int port2 = Integer.parseInt(hiveConf2.get(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname));
+ assertEquals(1, hs2Peers.getHiveServer2Instances().size());
+ for (HiveServer2Instance hsi : hs2Peers.getHiveServer2Instances()) {
+ if (hsi.getRpcPort() == port2) {
+ assertEquals(true, hsi.isLeader());
+ } else {
+ assertEquals(false, hsi.isLeader());
+ }
+ }
+
+ // start 1st server again
+ hiveConf1.set(ZkRegistryBase.UNIQUE_IDENTIFIER, UUID.randomUUID().toString());
+ miniHS2_1.start(confOverlay);
+
+ while(!miniHS2_1.isStarted()) {
+ Thread.sleep(100);
+ }
+ assertEquals(false, miniHS2_1.isLeader());
+ url = "http://localhost:" + hiveConf1.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/leader";
+ assertEquals("false", sendGet(url));
+
+ while (client.getAll().size() != 2) {
+ Thread.sleep(100);
+ }
+
+ client = HS2ActivePassiveHARegistryClient.getClient(conf);
+ hs2Instances = new ArrayList<>(client.getAll());
+ assertEquals(2, hs2Instances.size());
+ leaders = new ArrayList<>();
+ standby = new ArrayList<>();
+ for (HiveServer2Instance instance : hs2Instances) {
+ if (instance.isLeader()) {
+ leaders.add(instance);
+ } else {
+ standby.add(instance);
+ }
+ }
+ assertEquals(1, leaders.size());
+ assertEquals(1, standby.size());
+
+ url = "http://localhost:" + hiveConf1.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/peers";
+ resp = sendGet(url);
+ objectMapper = new ObjectMapper();
+ hs2Peers = objectMapper.readValue(resp, HS2Peers.HS2Instances.class);
+ port2 = Integer.parseInt(hiveConf2.get(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname));
+ assertEquals(2, hs2Peers.getHiveServer2Instances().size());
+ for (HiveServer2Instance hsi : hs2Peers.getHiveServer2Instances()) {
+ if (hsi.getRpcPort() == port2) {
+ assertEquals(true, hsi.isLeader());
+ } else {
+ assertEquals(false, hsi.isLeader());
+ }
+ }
+ }
+
+ private String sendGet(String url) throws Exception {
+ URL obj = new URL(url);
+ HttpURLConnection con = (HttpURLConnection) obj.openConnection();
+ con.setRequestMethod("GET");
+ BufferedReader in = new BufferedReader(new InputStreamReader(con.getInputStream()));
+ String inputLine;
+ StringBuilder response = new StringBuilder();
+ while ((inputLine = in.readLine()) != null) {
+ response.append(inputLine);
+ }
+ in.close();
+ return response.toString();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/AbstractHiveService.java
----------------------------------------------------------------------
diff --git a/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/AbstractHiveService.java b/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/AbstractHiveService.java
index 6cab8cd..d21b764 100644
--- a/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/AbstractHiveService.java
+++ b/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/AbstractHiveService.java
@@ -35,14 +35,16 @@ public abstract class AbstractHiveService {
private String hostname;
private int binaryPort;
private int httpPort;
+ private int webPort;
private boolean startedHiveService = false;
private List<String> addedProperties = new ArrayList<String>();
- public AbstractHiveService(HiveConf hiveConf, String hostname, int binaryPort, int httpPort) {
+ public AbstractHiveService(HiveConf hiveConf, String hostname, int binaryPort, int httpPort, int webPort) {
this.hiveConf = hiveConf;
this.hostname = hostname;
this.binaryPort = binaryPort;
this.httpPort = httpPort;
+ this.webPort = webPort;
}
/**
@@ -136,6 +138,10 @@ public abstract class AbstractHiveService {
return httpPort;
}
+ public int getWebPort() {
+ return webPort;
+ }
+
public boolean isStarted() {
return startedHiveService;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java
----------------------------------------------------------------------
diff --git a/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java b/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java
index 8bbf8a4..997726c 100644
--- a/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java
+++ b/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java
@@ -217,6 +217,8 @@ public class MiniHS2 extends AbstractHiveService {
(usePortsFromConf ? hiveConf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_PORT) : MetaStoreTestUtils
.findFreePort()),
(usePortsFromConf ? hiveConf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT) : MetaStoreTestUtils
+ .findFreePort()),
+ (usePortsFromConf ? hiveConf.getIntVar(ConfVars.HIVE_SERVER2_WEBUI_PORT) : MetaStoreTestUtils
.findFreePort()));
hiveConf.setLongVar(ConfVars.HIVE_SERVER2_MAX_START_ATTEMPTS, 3l);
hiveConf.setTimeVar(ConfVars.HIVE_SERVER2_SLEEP_INTERVAL_BETWEEN_START_ATTEMPTS, 10,
@@ -306,6 +308,7 @@ public class MiniHS2 extends AbstractHiveService {
hiveConf.setVar(ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST, getHost());
hiveConf.setIntVar(ConfVars.HIVE_SERVER2_THRIFT_PORT, getBinaryPort());
hiveConf.setIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT, getHttpPort());
+ hiveConf.setIntVar(ConfVars.HIVE_SERVER2_WEBUI_PORT, getWebPort());
Path scratchDir = new Path(baseFsDir, "scratch");
// Create root scratchdir with write all, so that user impersonation has no issues.
@@ -404,6 +407,10 @@ public class MiniHS2 extends AbstractHiveService {
}
+ public boolean isLeader() {
+ return hiveServer2.isLeader();
+ }
+
public CLIServiceClient getServiceClient() {
verifyStarted();
return getServiceClientInternal();
http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java
index 5d7f813..6178b4b 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java
@@ -14,13 +14,16 @@
package org.apache.hadoop.hive.llap.registry;
import java.io.IOException;
+
+import org.apache.hadoop.hive.registry.ServiceInstance;
+import org.apache.hadoop.hive.registry.ServiceInstanceSet;
import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener;
import org.apache.hadoop.yarn.api.records.ApplicationId;
/**
* ServiceRegistry interface for switching between fixed host and dynamic registry implementations.
*/
-public interface ServiceRegistry {
+public interface ServiceRegistry<T extends ServiceInstance> {
/**
* Start the service registry
@@ -49,14 +52,14 @@ public interface ServiceRegistry {
* @param clusterReadyTimeoutMs The time to wait for the cluster to be ready, if it's not
* started yet. 0 means do not wait.
*/
- LlapServiceInstanceSet getInstances(String component, long clusterReadyTimeoutMs) throws IOException;
+ ServiceInstanceSet<T> getInstances(String component, long clusterReadyTimeoutMs) throws
+ IOException;
/**
* Adds state change listeners for service instances.
* @param listener - state change listener
*/
- void registerStateChangeListener(
- ServiceInstanceStateChangeListener<LlapServiceInstance> listener) throws IOException;
+ void registerStateChangeListener(ServiceInstanceStateChangeListener<T> listener) throws IOException;
/**
* @return The application ID of the LLAP cluster.
http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
index c88198f..f99d86c 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.registry.LlapServiceInstance;
import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet;
import org.apache.hadoop.hive.llap.registry.ServiceRegistry;
+import org.apache.hadoop.hive.registry.ServiceInstance;
import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.StringUtils;
@@ -267,8 +268,7 @@ public class LlapFixedRegistryImpl implements ServiceRegistry {
}
@Override
- public void registerStateChangeListener(
- final ServiceInstanceStateChangeListener<LlapServiceInstance> listener) {
+ public void registerStateChangeListener(final ServiceInstanceStateChangeListener listener) throws IOException {
// nothing to set
LOG.warn("Callbacks for instance state changes are not supported in fixed registry.");
}
http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java
index 80a6aba..3bda40b 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet;
import org.apache.hadoop.hive.llap.registry.ServiceRegistry;
+import org.apache.hadoop.hive.registry.ServiceInstanceSet;
import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener;
import org.apache.hadoop.registry.client.binding.RegistryUtils;
import org.apache.hadoop.service.AbstractService;
@@ -35,7 +36,7 @@ public class LlapRegistryService extends AbstractService {
private static final Logger LOG = LoggerFactory.getLogger(LlapRegistryService.class);
- private ServiceRegistry registry = null;
+ private ServiceRegistry<LlapServiceInstance> registry = null;
private final boolean isDaemon;
private boolean isDynamic = false;
private String identity = "(pending)";
@@ -136,7 +137,7 @@ public class LlapRegistryService extends AbstractService {
}
public LlapServiceInstanceSet getInstances(long clusterReadyTimeoutMs) throws IOException {
- return this.registry.getInstances("LLAP", clusterReadyTimeoutMs);
+ return (LlapServiceInstanceSet) this.registry.getInstances("LLAP", clusterReadyTimeoutMs);
}
public void registerStateChangeListener(
http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
index 8339230..f5d6202 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
@@ -53,7 +53,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class LlapZookeeperRegistryImpl
- extends ZkRegistryBase<LlapServiceInstance> implements ServiceRegistry {
+ extends ZkRegistryBase<LlapServiceInstance> implements ServiceRegistry<LlapServiceInstance> {
private static final Logger LOG = LoggerFactory.getLogger(LlapZookeeperRegistryImpl.class);
/**
@@ -65,8 +65,6 @@ public class LlapZookeeperRegistryImpl
private static final String IPC_LLAP = "llap";
private static final String IPC_OUTPUTFORMAT = "llapoutputformat";
private final static String NAMESPACE_PREFIX = "llap-";
- private final static String USER_SCOPE_PATH_PREFIX = "user-";
- private static final String WORKER_PREFIX = "worker-";
private static final String SLOT_PREFIX = "slot-";
private static final String SASL_LOGIN_CONTEXT_NAME = "LlapZooKeeperClient";
@@ -79,7 +77,7 @@ public class LlapZookeeperRegistryImpl
public LlapZookeeperRegistryImpl(String instanceName, Configuration conf) {
super(instanceName, conf,
HiveConf.getVar(conf, ConfVars.LLAP_ZK_REGISTRY_NAMESPACE), NAMESPACE_PREFIX,
- USER_SCOPE_PATH_PREFIX, WORKER_PREFIX,
+ USER_SCOPE_PATH_PREFIX, WORKER_PREFIX, WORKER_GROUP,
LlapProxy.isDaemon() ? SASL_LOGIN_CONTEXT_NAME : null,
HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_PRINCIPAL),
HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_KEYTAB_FILE),
@@ -225,7 +223,7 @@ public class LlapZookeeperRegistryImpl
@Override
public String toString() {
- return "DynamicServiceInstance [id=" + getWorkerIdentity() + ", host=" + host + ":" + rpcPort +
+ return "DynamicServiceInstance [id=" + getWorkerIdentity() + ", host=" + getHost() + ":" + getRpcPort() +
" with resources=" + getResource() + ", shufflePort=" + getShufflePort() +
", servicesAddress=" + getServicesAddress() + ", mgmtPort=" + getManagementPort() + "]";
}
@@ -327,9 +325,9 @@ public class LlapZookeeperRegistryImpl
if (data == null) continue;
String nodeName = extractNodeName(childData);
if (nodeName.startsWith(WORKER_PREFIX)) {
- Set<LlapServiceInstance> instances = getInstancesByPath(childData.getPath());
+ LlapServiceInstance instances = getInstanceByPath(childData.getPath());
if (instances != null) {
- unsorted.addAll(instances);
+ unsorted.add(instances);
}
} else if (nodeName.startsWith(SLOT_PREFIX)) {
slotByWorker.put(extractWorkerIdFromSlot(childData),
http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClient.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClient.java b/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClient.java
index 32d5caa..3208e21 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClient.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClient.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
import org.apache.hadoop.hive.llap.registry.LlapServiceInstance;
import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet;
import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
+import org.apache.hadoop.hive.registry.ServiceInstanceSet;
import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
@@ -55,7 +56,7 @@ public class LlapTokenClient {
private final SocketFactory socketFactory;
private final RetryPolicy retryPolicy;
private final Configuration conf;
- private LlapServiceInstanceSet activeInstances;
+ private ServiceInstanceSet<LlapServiceInstance> activeInstances;
private Collection<LlapServiceInstance> lastKnownInstances;
private LlapManagementProtocolClientImpl client;
private LlapServiceInstance clientInstance;
http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/llap-client/src/java/org/apache/hadoop/hive/registry/RegistryUtilities.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/RegistryUtilities.java b/llap-client/src/java/org/apache/hadoop/hive/registry/RegistryUtilities.java
new file mode 100644
index 0000000..e069e43
--- /dev/null
+++ b/llap-client/src/java/org/apache/hadoop/hive/registry/RegistryUtilities.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.registry;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.UUID;
+
+public class RegistryUtilities {
+ private static final String LOCALHOST = "localhost";
+
+ /**
+ * Will return hostname stored in InetAddress.
+ *
+ * @return hostname
+ */
+ public static String getHostName() {
+ try {
+ return InetAddress.getLocalHost().getHostName();
+ } catch (UnknownHostException e) {
+ return LOCALHOST;
+ }
+ }
+
+ /**
+ * Will return FQDN of the host after doing reverse DNS lookip.
+ *
+ * @return FQDN of host
+ */
+ public static String getCanonicalHostName() {
+ try {
+ return InetAddress.getLocalHost().getCanonicalHostName();
+ } catch (UnknownHostException e) {
+ return LOCALHOST;
+ }
+ }
+
+ public static String getUUID() {
+ return String.valueOf(UUID.randomUUID());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstance.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstance.java b/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstance.java
index 908b3bb..4493e99 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstance.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstance.java
@@ -21,27 +21,27 @@ public interface ServiceInstance {
* Worker identity is a UUID (unique across restarts), to identify a node which died & was brought
* back on the same host/port
*/
- public abstract String getWorkerIdentity();
+ String getWorkerIdentity();
/**
* Hostname of the service instance
*
- * @return
+ * @return service hostname
*/
- public abstract String getHost();
+ String getHost();
/**
* RPC Endpoint for service instance
- *
- * @return
+ *
+ * @return rpc port
*/
- public int getRpcPort();
+ int getRpcPort();
/**
* Config properties of the Service Instance (llap.daemon.*)
- *
- * @return
+ *
+ * @return properties
*/
- public abstract Map<String, String> getProperties();
+ Map<String, String> getProperties();
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstanceSet.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstanceSet.java b/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstanceSet.java
index 34fba5c..63178cc 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstanceSet.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstanceSet.java
@@ -29,15 +29,15 @@ public interface ServiceInstanceSet<InstanceType extends ServiceInstance> {
* The worker identity does not collide between restarts, so each restart will have a unique id,
* while having the same host/ip pair.
*
- * @return
+ * @return instance list
*/
Collection<InstanceType> getAll();
/**
* Get an instance by worker identity.
*
- * @param name
- * @return
+ * @param name worker id
+ * @return instance
*/
InstanceType getInstance(String name);
@@ -46,13 +46,13 @@ public interface ServiceInstanceSet<InstanceType extends ServiceInstance> {
*
* The list could include dead and alive instances.
*
- * @param host
- * @return
+ * @param host hostname
+ * @return instance list
*/
Set<InstanceType> getByHost(String host);
/**
- * Get number of instances in the currently availabe.
+ * Get number of instances in the currently available.
*
* @return - number of instances
*/
http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ServiceInstanceBase.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ServiceInstanceBase.java b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ServiceInstanceBase.java
index db3d788..de8910c 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ServiceInstanceBase.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ServiceInstanceBase.java
@@ -15,6 +15,8 @@ package org.apache.hadoop.hive.registry.impl;
import java.io.IOException;
import java.util.Map;
+import java.util.Objects;
+
import org.apache.hadoop.hive.registry.ServiceInstance;
import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
import org.apache.hadoop.registry.client.types.AddressTypes;
@@ -25,26 +27,27 @@ import org.slf4j.LoggerFactory;
public class ServiceInstanceBase implements ServiceInstance {
private static final Logger LOG = LoggerFactory.getLogger(ServiceInstanceBase.class);
+ private String host;
+ private int rpcPort;
+ private String workerIdentity;
+ private Map<String, String> properties;
- protected final ServiceRecord srv;
- protected final String host;
- protected final int rpcPort;
+ // empty c'tor to make jackson happy
+ public ServiceInstanceBase() {
- public ServiceInstanceBase(ServiceRecord srv, String rpcName) throws IOException {
- this.srv = srv;
+ }
+ public ServiceInstanceBase(ServiceRecord srv, String rpcName) throws IOException {
if (LOG.isTraceEnabled()) {
LOG.trace("Working with ServiceRecord: {}", srv);
}
-
final Endpoint rpc = srv.getInternalEndpoint(rpcName);
-
- this.host =
- RegistryTypeUtils.getAddressField(rpc.addresses.get(0),
- AddressTypes.ADDRESS_HOSTNAME_FIELD);
- this.rpcPort =
- Integer.parseInt(RegistryTypeUtils.getAddressField(rpc.addresses.get(0),
- AddressTypes.ADDRESS_PORT_FIELD));
+ this.host = RegistryTypeUtils.getAddressField(rpc.addresses.get(0),
+ AddressTypes.ADDRESS_HOSTNAME_FIELD);
+ this.rpcPort = Integer.parseInt(RegistryTypeUtils.getAddressField(rpc.addresses.get(0),
+ AddressTypes.ADDRESS_PORT_FIELD));
+ this.workerIdentity = srv.get(ZkRegistryBase.UNIQUE_IDENTIFIER);
+ this.properties = srv.attributes();
}
@Override
@@ -57,17 +60,19 @@ public class ServiceInstanceBase implements ServiceInstance {
}
ServiceInstanceBase other = (ServiceInstanceBase) o;
- return this.getWorkerIdentity().equals(other.getWorkerIdentity());
+ return Objects.equals(getWorkerIdentity(), other.getWorkerIdentity())
+ && Objects.equals(host, other.host)
+ && Objects.equals(rpcPort, other.rpcPort);
}
@Override
public int hashCode() {
- return getWorkerIdentity().hashCode();
+ return getWorkerIdentity().hashCode() + (31 * host.hashCode()) + (31 * rpcPort);
}
@Override
public String getWorkerIdentity() {
- return srv.get(ZkRegistryBase.UNIQUE_IDENTIFIER);
+ return workerIdentity;
}
@Override
@@ -82,12 +87,28 @@ public class ServiceInstanceBase implements ServiceInstance {
@Override
public Map<String, String> getProperties() {
- return srv.attributes();
+ return properties;
+ }
+
+ public void setHost(final String host) {
+ this.host = host;
+ }
+
+ public void setRpcPort(final int rpcPort) {
+ this.rpcPort = rpcPort;
+ }
+
+ public void setWorkerIdentity(final String workerIdentity) {
+ this.workerIdentity = workerIdentity;
+ }
+
+ public void setProperties(final Map<String, String> properties) {
+ this.properties = properties;
}
@Override
public String toString() {
return "DynamicServiceInstance [id=" + getWorkerIdentity() + ", host="
- + host + ":" + rpcPort + "]";
+ + host + ":" + rpcPort + "]";
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java
index 0724cf5..d09cb24 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java
@@ -13,29 +13,26 @@
*/
package org.apache.hadoop.hive.registry.impl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.io.IOException;
import org.apache.commons.codec.binary.Base64;
-
-import com.google.common.io.ByteStreams;
-
-import org.apache.tez.common.security.JobTokenIdentifier;
-
-import org.apache.hadoop.security.token.Token;
-
-import java.io.IOException;
import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
import org.apache.hadoop.registry.client.types.AddressTypes;
import org.apache.hadoop.registry.client.types.Endpoint;
import org.apache.hadoop.registry.client.types.ServiceRecord;
+import org.apache.hadoop.security.token.Token;
+import org.apache.tez.common.security.JobTokenIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.io.ByteStreams;
public class TezAmInstance extends ServiceInstanceBase {
private static final Logger LOG = LoggerFactory.getLogger(TezAmInstance.class);
private final int pluginPort;
private Token<JobTokenIdentifier> token;
- public TezAmInstance(ServiceRecord srv) throws IOException {
+ TezAmInstance(ServiceRecord srv) throws IOException {
super(srv, TezAmRegistryImpl.IPC_TEZCLIENT);
final Endpoint plugin = srv.getInternalEndpoint(TezAmRegistryImpl.IPC_PLUGIN);
if (plugin != null) {
@@ -76,7 +73,7 @@ public class TezAmInstance extends ServiceInstanceBase {
@Override
public String toString() {
- return "TezAmInstance [" + getSessionId() + ", host=" + host + ", rpcPort=" + rpcPort +
+ return "TezAmInstance [" + getSessionId() + ", host=" + getHost() + ", rpcPort=" + getRpcPort() +
", pluginPort=" + pluginPort + ", token=" + token + "]";
}
http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmRegistryImpl.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmRegistryImpl.java
index 417e571..ab02cf4 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmRegistryImpl.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmRegistryImpl.java
@@ -37,21 +37,19 @@ public class TezAmRegistryImpl extends ZkRegistryBase<TezAmInstance> {
static final String AM_SESSION_ID = "am.session.id", AM_PLUGIN_TOKEN = "am.plugin.token",
AM_PLUGIN_JOBID = "am.plugin.jobid";
private final static String NAMESPACE_PREFIX = "tez-am-";
- private final static String USER_SCOPE_PATH_PREFIX = "user-";
- private static final String WORKER_PREFIX = "worker-";
private static final String SASL_LOGIN_CONTEXT_NAME = "TezAmZooKeeperClient";
private final String registryName;
- public static TezAmRegistryImpl create(Configuration conf, boolean b) {
+ public static TezAmRegistryImpl create(Configuration conf, boolean useSecureZk) {
String amRegistryName = HiveConf.getVar(conf, ConfVars.LLAP_TASK_SCHEDULER_AM_REGISTRY_NAME);
return StringUtils.isBlank(amRegistryName) ? null
- : new TezAmRegistryImpl(amRegistryName, conf, true);
+ : new TezAmRegistryImpl(amRegistryName, conf, useSecureZk);
}
private TezAmRegistryImpl(String instanceName, Configuration conf, boolean useSecureZk) {
- super(instanceName, conf, null, NAMESPACE_PREFIX, USER_SCOPE_PATH_PREFIX, WORKER_PREFIX,
+ super(instanceName, conf, null, NAMESPACE_PREFIX, USER_SCOPE_PATH_PREFIX, WORKER_PREFIX, WORKER_GROUP,
useSecureZk ? SASL_LOGIN_CONTEXT_NAME : null,
HiveConf.getVar(conf, ConfVars.LLAP_TASK_SCHEDULER_AM_REGISTRY_PRINCIPAL),
HiveConf.getVar(conf, ConfVars.LLAP_TASK_SCHEDULER_AM_REGISTRY_KEYTAB_FILE),
http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java
index 17269dd..e7227a8 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java
@@ -13,13 +13,7 @@
*/
package org.apache.hadoop.hive.registry.impl;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
@@ -34,6 +28,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLProvider;
@@ -44,12 +39,15 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode;
import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode.Mode;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.LlapUtil;
+import org.apache.hadoop.hive.registry.RegistryUtilities;
import org.apache.hadoop.hive.registry.ServiceInstance;
import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener;
import org.apache.hadoop.registry.client.binding.RegistryUtils;
@@ -65,6 +63,12 @@ import org.apache.zookeeper.data.Id;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
/**
* This is currently used for implementation inheritance only; it doesn't provide a unified flow
* into which one can just plug a few abstract method implementations, because providing one with
@@ -77,16 +81,18 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
private static final Logger LOG = LoggerFactory.getLogger(ZkRegistryBase.class);
private final static String SASL_NAMESPACE = "sasl";
private final static String UNSECURE_NAMESPACE = "unsecure";
-
- static final String UNIQUE_IDENTIFIER = "registry.unique.id";
- private static final UUID uniq = UUID.randomUUID();
+ protected final static String USER_SCOPE_PATH_PREFIX = "user-";
+ protected static final String WORKER_PREFIX = "worker-";
+ protected static final String WORKER_GROUP = "workers";
+ public static final String UNIQUE_IDENTIFIER = "registry.unique.id";
+ protected static final UUID UNIQUE_ID = UUID.randomUUID();
+ private static final Joiner PATH_JOINER = Joiner.on("/").skipNulls();
protected final Configuration conf;
protected final CuratorFramework zooKeeperClient;
- // userPathPrefix is the path specific to the user for which ACLs should be restrictive.
// workersPath is the directory path where all the worker znodes are located.
protected final String workersPath;
- private final String userPathPrefix, workerNodePrefix;
+ private final String workerNodePrefix;
protected final ServiceRecordMarshal encoder; // to marshal/unmarshal znode data
@@ -99,7 +105,9 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
private final String disableMessage;
private final Lock instanceCacheLock = new ReentrantLock();
- private final Map<String, Set<InstanceType>> pathToInstanceCache;
+ // there can be only one instance per path
+ private final Map<String, InstanceType> pathToInstanceCache;
+ // there can be multiple instances per node
private final Map<String, Set<InstanceType>> nodeToInstanceCache;
// The registration znode.
@@ -109,29 +117,22 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
private PathChildrenCache instancesCache; // Created on demand.
/** Local hostname. */
- protected static final String hostname;
- static {
- String localhost = "localhost";
- try {
- localhost = InetAddress.getLocalHost().getCanonicalHostName();
- } catch (UnknownHostException uhe) {
- // ignore
- }
- hostname = localhost;
- }
+ protected static final String hostname = RegistryUtilities.getCanonicalHostName();
/**
* @param rootNs A single root namespace override. Not recommended.
- * @param nsPrefix The namespace prefix to use with default namespaces.
+ * @param nsPrefix The namespace prefix to use with default namespaces (appends 'sasl' for secure else 'unsecure'
+ * to namespace prefix to get effective root namespace).
* @param userScopePathPrefix The prefix to use for the user-specific part of the path.
* @param workerPrefix The prefix to use for each worker znode.
+ * @param workerGroup group name to use for all workers
* @param zkSaslLoginContextName SASL login context name for ZK security; null if not needed.
* @param zkPrincipal ZK security principal.
* @param zkKeytab ZK security keytab.
* @param aclsConfig A config setting to use to determine if ACLs should be verified.
*/
public ZkRegistryBase(String instanceName, Configuration conf, String rootNs, String nsPrefix,
- String userScopePathPrefix, String workerPrefix,
+ String userScopePathPrefix, String workerPrefix, String workerGroup,
String zkSaslLoginContextName, String zkPrincipal, String zkKeytab, ConfVars aclsConfig) {
this.conf = new Configuration(conf);
this.saslLoginContextName = zkSaslLoginContextName;
@@ -145,29 +146,52 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
this.disableMessage = "";
}
this.conf.addResource(YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);
- String zkEnsemble = getQuorumServers(this.conf);
this.encoder = new RegistryUtils.ServiceRecordMarshal();
- int sessionTimeout = (int) HiveConf.getTimeVar(conf,
- ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT, TimeUnit.MILLISECONDS);
- int baseSleepTime = (int) HiveConf.getTimeVar(conf,
- ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME, TimeUnit.MILLISECONDS);
- int maxRetries = HiveConf.getIntVar(conf, ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES);
// sample path: /llap-sasl/hiveuser/hostname/workers/worker-0000000
// worker-0000000 is the sequence number which will be retained until session timeout. If a
// worker does not respond due to communication interruptions it will retain the same sequence
// number when it returns back. If session timeout expires, the node will be deleted and new
// addition of the same node (restart) will get next sequence number
- this.userPathPrefix = userScopePathPrefix + getZkPathUser(this.conf);
- this.workerNodePrefix = workerPrefix;
- this.workersPath = "/" + userPathPrefix + "/" + instanceName + "/workers";
+ final String userPathPrefix = userScopePathPrefix == null ? null : userScopePathPrefix + getZkPathUser(conf);
+ this.workerNodePrefix = workerPrefix == null ? WORKER_PREFIX : workerPrefix;
+ this.workersPath = "/" + PATH_JOINER.join(userPathPrefix, instanceName, workerGroup);
this.instancesCache = null;
this.stateChangeListeners = new HashSet<>();
this.pathToInstanceCache = new ConcurrentHashMap<>();
this.nodeToInstanceCache = new ConcurrentHashMap<>();
+ final String namespace = getRootNamespace(rootNs, nsPrefix);
+ ACLProvider aclProvider;
+ // get acl provider for most outer path that is non-null
+ if (userPathPrefix == null) {
+ if (instanceName == null) {
+ if (workerGroup == null) {
+ aclProvider = getACLProviderForZKPath(namespace);
+ } else {
+ aclProvider = getACLProviderForZKPath(workerGroup);
+ }
+ } else {
+ aclProvider = getACLProviderForZKPath(instanceName);
+ }
+ } else {
+ aclProvider = getACLProviderForZKPath(userScopePathPrefix);
+ }
+ this.zooKeeperClient = getZookeeperClient(conf, namespace, aclProvider);
+ this.zooKeeperClient.getConnectionStateListenable().addListener(new ZkConnectionStateListener());
+ }
+
+ public static String getRootNamespace(String userProvidedNamespace, String defaultNamespacePrefix) {
+ final boolean isSecure = UserGroupInformation.isSecurityEnabled();
+ String rootNs = userProvidedNamespace;
+ if (rootNs == null) {
+ rootNs = defaultNamespacePrefix + (isSecure ? SASL_NAMESPACE : UNSECURE_NAMESPACE);
+ }
+ return rootNs;
+ }
+ private ACLProvider getACLProviderForZKPath(String zkPath) {
final boolean isSecure = UserGroupInformation.isSecurityEnabled();
- ACLProvider zooKeeperAclProvider = new ACLProvider() {
+ return new ACLProvider() {
@Override
public List<ACL> getDefaultAcl() {
// We always return something from getAclForPath so this should not happen.
@@ -177,31 +201,40 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
@Override
public List<ACL> getAclForPath(String path) {
- if (!isSecure || path == null || !path.contains(userPathPrefix)) {
+ if (!isSecure || path == null || !path.contains(zkPath)) {
// No security or the path is below the user path - full access.
return Lists.newArrayList(ZooDefs.Ids.OPEN_ACL_UNSAFE);
}
return createSecureAcls();
}
};
- if (rootNs == null) {
- rootNs = nsPrefix + (isSecure ? SASL_NAMESPACE : UNSECURE_NAMESPACE); // The normal path.
- }
+ }
+
+ private CuratorFramework getZookeeperClient(Configuration conf, String namespace, ACLProvider zooKeeperAclProvider) {
+ String zkEnsemble = getQuorumServers(conf);
+ int sessionTimeout = (int) HiveConf.getTimeVar(conf,
+ ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT, TimeUnit.MILLISECONDS);
+ int connectionTimeout = (int) HiveConf.getTimeVar(conf,
+ ConfVars.HIVE_ZOOKEEPER_CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);
+ int baseSleepTime = (int) HiveConf.getTimeVar(conf,
+ ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME, TimeUnit.MILLISECONDS);
+ int maxRetries = HiveConf.getIntVar(conf, ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES);
// Create a CuratorFramework instance to be used as the ZooKeeper client
// Use the zooKeeperAclProvider to create appropriate ACLs
- this.zooKeeperClient = CuratorFrameworkFactory.builder()
- .connectString(zkEnsemble)
- .sessionTimeoutMs(sessionTimeout)
- .aclProvider(zooKeeperAclProvider)
- .namespace(rootNs)
- .retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries))
- .build();
+ return CuratorFrameworkFactory.builder()
+ .connectString(zkEnsemble)
+ .sessionTimeoutMs(sessionTimeout)
+ .connectionTimeoutMs(connectionTimeout)
+ .aclProvider(zooKeeperAclProvider)
+ .namespace(namespace)
+ .retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries))
+ .build();
}
private static List<ACL> createSecureAcls() {
// Read all to the world
- List<ACL> nodeAcls = new ArrayList<ACL>(ZooDefs.Ids.READ_ACL_UNSAFE);
+ List<ACL> nodeAcls = new ArrayList<>(ZooDefs.Ids.READ_ACL_UNSAFE);
// Create/Delete/Write/Admin to creator
nodeAcls.addAll(ZooDefs.Ids.CREATOR_ALL_ACL);
return nodeAcls;
@@ -211,9 +244,9 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
* Get the ensemble server addresses from the configuration. The format is: host1:port,
* host2:port..
*
- * @param conf
+ * @param conf configuration
**/
- private String getQuorumServers(Configuration conf) {
+ private static String getQuorumServers(Configuration conf) {
String[] hosts = conf.getTrimmedStrings(ConfVars.HIVE_ZOOKEEPER_QUORUM.varname);
String port = conf.get(ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT.varname,
ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT.getDefaultValue());
@@ -238,7 +271,7 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
protected final String registerServiceRecord(ServiceRecord srv) throws IOException {
// restart sensitive instance id
- srv.set(UNIQUE_IDENTIFIER, uniq.toString());
+ srv.set(UNIQUE_IDENTIFIER, UNIQUE_ID.toString());
// Create a znode under the rootNamespace parent for this instance of the server
try {
@@ -275,11 +308,28 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
CloseableUtils.closeQuietly(znode);
throw (e instanceof IOException) ? (IOException)e : new IOException(e);
}
- return uniq.toString();
+ return UNIQUE_ID.toString();
}
+ protected final void updateServiceRecord(ServiceRecord srv) throws IOException {
+ try {
+ znode.setData(encoder.toBytes(srv));
+
+ if (doCheckAcls) {
+ try {
+ checkAndSetAcls();
+ } catch (Exception ex) {
+ throw new IOException("Error validating or setting ACLs. " + disableMessage, ex);
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("Unable to update znode with new service record", e);
+ CloseableUtils.closeQuietly(znode);
+ throw (e instanceof IOException) ? (IOException) e : new IOException(e);
+ }
+ }
- protected final void initializeWithoutRegisteringInternal() throws IOException {
+ final void initializeWithoutRegisteringInternal() throws IOException {
// Create a znode under the rootNamespace parent for this instance of the server
try {
try {
@@ -345,8 +395,8 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
private void addToCache(String path, String host, InstanceType instance) {
instanceCacheLock.lock();
try {
- putInCache(path, pathToInstanceCache, instance);
- putInCache(host, nodeToInstanceCache, instance);
+ putInInstanceCache(path, pathToInstanceCache, instance);
+ putInNodeCache(host, nodeToInstanceCache, instance);
} finally {
instanceCacheLock.unlock();
}
@@ -368,14 +418,19 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
path, host, pathToInstanceCache.size(), nodeToInstanceCache.size());
}
- private void putInCache(String key, Map<String, Set<InstanceType>> cache,
+ private void putInInstanceCache(String key, Map<String, InstanceType> cache,
InstanceType instance) {
+ cache.put(key, instance);
+ }
+
+ private void putInNodeCache(String key, Map<String, Set<InstanceType>> cache,
+ InstanceType instance) {
Set<InstanceType> instanceSet = cache.get(key);
if (instanceSet == null) {
- instanceSet = Sets.newHashSet();
- cache.put(key, instanceSet);
+ instanceSet = new HashSet<>();
+ instanceSet.add(instance);
}
- instanceSet.add(instance);
+ cache.put(key, instanceSet);
}
protected final void populateCache(PathChildrenCache instancesCache, boolean doInvokeListeners) {
@@ -403,7 +458,7 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
protected abstract InstanceType createServiceInstance(ServiceRecord srv) throws IOException;
- protected static final byte[] getWorkerData(ChildData childData, String workerNodePrefix) {
+ protected static byte[] getWorkerData(ChildData childData, String workerNodePrefix) {
if (childData == null) return null;
byte[] data = childData.getData();
if (data == null) return null;
@@ -415,8 +470,7 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
private final Logger LOG = LoggerFactory.getLogger(InstanceStateChangeListener.class);
@Override
- public void childEvent(final CuratorFramework client, final PathChildrenCacheEvent event)
- throws Exception {
+ public void childEvent(final CuratorFramework client, final PathChildrenCacheEvent event) {
Preconditions.checkArgument(client != null
&& client.getState() == CuratorFrameworkState.STARTED, "client is not started");
@@ -427,28 +481,32 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
if (!nodeName.startsWith(workerNodePrefix)) return;
LOG.info("{} for zknode {}", event.getType(), childData.getPath());
InstanceType instance = extractServiceInstance(event, childData);
- int ephSeqVersion = extractSeqNum(nodeName);
- switch (event.getType()) {
- case CHILD_ADDED:
- addToCache(childData.getPath(), instance.getHost(), instance);
- for (ServiceInstanceStateChangeListener<InstanceType> listener : stateChangeListeners) {
- listener.onCreate(instance, ephSeqVersion);
- }
- break;
- case CHILD_UPDATED:
- addToCache(childData.getPath(), instance.getHost(), instance);
- for (ServiceInstanceStateChangeListener<InstanceType> listener : stateChangeListeners) {
- listener.onUpdate(instance, ephSeqVersion);
- }
- break;
- case CHILD_REMOVED:
- removeFromCache(childData.getPath(), instance.getHost());
- for (ServiceInstanceStateChangeListener<InstanceType> listener : stateChangeListeners) {
- listener.onRemove(instance, ephSeqVersion);
+ if (instance != null) {
+ int ephSeqVersion = extractSeqNum(nodeName);
+ switch (event.getType()) {
+ case CHILD_ADDED:
+ addToCache(childData.getPath(), instance.getHost(), instance);
+ for (ServiceInstanceStateChangeListener<InstanceType> listener : stateChangeListeners) {
+ listener.onCreate(instance, ephSeqVersion);
+ }
+ break;
+ case CHILD_UPDATED:
+ addToCache(childData.getPath(), instance.getHost(), instance);
+ for (ServiceInstanceStateChangeListener<InstanceType> listener : stateChangeListeners) {
+ listener.onUpdate(instance, ephSeqVersion);
+ }
+ break;
+ case CHILD_REMOVED:
+ removeFromCache(childData.getPath(), instance.getHost());
+ for (ServiceInstanceStateChangeListener<InstanceType> listener : stateChangeListeners) {
+ listener.onRemove(instance, ephSeqVersion);
+ }
+ break;
+ default:
+ // Ignore all the other events; logged above.
}
- break;
- default:
- // Ignore all the other events; logged above.
+ } else {
+ LOG.info("instance is null for event: {} childData: {}", event.getType(), childData);
}
}
}
@@ -464,7 +522,7 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
protected final Set<InstanceType> getByHostInternal(String host) {
Set<InstanceType> byHost = nodeToInstanceCache.get(host);
- byHost = (byHost == null) ? Sets.<InstanceType>newHashSet() : byHost;
+ byHost = (byHost == null) ? Sets.newHashSet() : byHost;
if (LOG.isDebugEnabled()) {
LOG.debug("Returning " + byHost.size() + " hosts for locality allocation on " + host);
}
@@ -472,11 +530,7 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
}
protected final Collection<InstanceType> getAllInternal() {
- Set<InstanceType> instances = new HashSet<>();
- for(Set<InstanceType> instanceSet : pathToInstanceCache.values()) {
- instances.addAll(instanceSet);
- }
- return instances;
+ return new HashSet<>(pathToInstanceCache.values());
}
private static String extractNodeName(ChildData childData) {
@@ -564,13 +618,17 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
CloseableUtils.class.getName();
}
+ protected void unregisterInternal() {
+ CloseableUtils.closeQuietly(znode);
+ }
+
public void stop() {
CloseableUtils.closeQuietly(znode);
CloseableUtils.closeQuietly(instancesCache);
CloseableUtils.closeQuietly(zooKeeperClient);
}
- protected final Set<InstanceType> getInstancesByPath(String path) {
+ protected final InstanceType getInstanceByPath(String path) {
return pathToInstanceCache.get(path);
}
@@ -588,4 +646,12 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
throw e;
}
}
+
+ // for debugging
+ private class ZkConnectionStateListener implements ConnectionStateListener {
+ @Override
+ public void stateChanged(final CuratorFramework curatorFramework, final ConnectionState connectionState) {
+ LOG.info("Connection state change notification received. State: {}", connectionState);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
----------------------------------------------------------------------
diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
index 0120639..3aec46b 100644
--- a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
+++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet;
import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
import org.apache.hadoop.hive.llap.tez.Converters;
+import org.apache.hadoop.hive.registry.ServiceInstanceSet;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.NullWritable;
@@ -343,7 +344,7 @@ public class LlapBaseInputFormat<V extends WritableComparable<?>>
private LlapServiceInstance getServiceInstanceForHost(LlapRegistryService registryService, String host) throws IOException {
InetAddress address = InetAddress.getByName(host);
- LlapServiceInstanceSet instanceSet = registryService.getInstances();
+ ServiceInstanceSet<LlapServiceInstance> instanceSet = registryService.getInstances();
LlapServiceInstance serviceInstance = null;
// The name used in the service registry may not match the host name we're using.
@@ -375,7 +376,7 @@ public class LlapBaseInputFormat<V extends WritableComparable<?>>
private LlapServiceInstance getServiceInstanceRandom(LlapRegistryService registryService) throws IOException {
- LlapServiceInstanceSet instanceSet = registryService.getInstances();
+ ServiceInstanceSet<LlapServiceInstance> instanceSet = registryService.getInstances();
LlapServiceInstance serviceInstance = null;
LOG.info("Finding random live service instance");
http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java
index 58bf8dc..b944fad 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java
@@ -34,8 +34,10 @@ import org.apache.hadoop.hive.common.classification.InterfaceAudience;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.registry.LlapServiceInstance;
+import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet;
import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
import org.apache.hadoop.hive.llap.registry.impl.LlapZookeeperRegistryImpl;
+import org.apache.hadoop.hive.registry.ServiceInstanceSet;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.service.CompositeService;
@@ -230,7 +232,8 @@ public class LlapWebServices extends AbstractService {
}
jg.writeStringField("identity", registry.getWorkerIdentity());
jg.writeArrayFieldStart("peers");
- for (LlapServiceInstance s : registry.getInstances().getAllInstancesOrdered(false)) {
+ ServiceInstanceSet<LlapServiceInstance> instanceSet = registry.getInstances();
+ for (LlapServiceInstance s : ((LlapServiceInstanceSet) instanceSet).getAllInstancesOrdered(false)) {
jg.writeStartObject();
jg.writeStringField("identity", s.getWorkerIdentity());
jg.writeStringField("host", s.getHost());
http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
index 66de3b8..6ddecca 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
@@ -14,6 +14,16 @@
package org.apache.hadoop.hive.llap.tezplugins;
+import com.google.common.io.ByteArrayDataOutput;
+
+import org.apache.hadoop.hive.registry.ServiceInstanceSet;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+import org.apache.hadoop.hive.registry.impl.TezAmRegistryImpl;
+
+import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
index 46cfe56..a051f90 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
@@ -98,13 +98,18 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger
protected TezSessionPoolManager() {
}
- public void startPool() throws Exception {
+ public void startPool(HiveConf conf, final WMFullResourcePlan resourcePlan) throws Exception {
if (defaultSessionPool != null) {
defaultSessionPool.start();
}
if (expirationTracker != null) {
expirationTracker.start();
}
+ initTriggers(conf);
+ if (resourcePlan != null) {
+ updateTriggers(resourcePlan);
+ LOG.info("Updated tez session pool manager with active resource plan: {}", resourcePlan.getPlan().getName());
+ }
}
public void setupPool(HiveConf conf) throws Exception {
@@ -157,8 +162,6 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger
numConcurrentLlapQueries = conf.getIntVar(ConfVars.HIVE_SERVER2_LLAP_CONCURRENT_QUERIES);
llapQueue = new Semaphore(numConcurrentLlapQueries, true);
- initTriggers(conf);
-
String queueAllowedStr = HiveConf.getVar(initConf,
ConfVars.HIVE_SERVER2_TEZ_SESSION_CUSTOM_QUEUE_ALLOWED);
try {
http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java
index d1b3fec..d3748ed 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java
@@ -18,25 +18,20 @@
package org.apache.hadoop.hive.ql.exec.tez;
-import com.google.common.util.concurrent.SettableFuture;
-import org.apache.hadoop.hive.registry.impl.TezAmInstance;
-import org.apache.hadoop.conf.Configuration;
import java.io.IOException;
import java.net.URISyntaxException;
-import java.util.Collection;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+
import javax.security.auth.login.LoginException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
+
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
-import org.apache.hadoop.hive.ql.wm.SessionTriggerProvider;
-import org.apache.hadoop.hive.ql.wm.TriggerActionHandler;
import org.apache.hadoop.hive.registry.impl.TezAmInstance;
import org.apache.tez.dag.api.TezException;
+
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
index b98fb58..046ea19 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
@@ -738,7 +738,10 @@ public class TezSessionState {
private Path createTezDir(String sessionId, String suffix) throws IOException {
// tez needs its own scratch dir (per session)
// TODO: De-link from SessionState. A TezSession can be linked to different Hive Sessions via the pool.
- Path tezDir = new Path(SessionState.get().getHdfsScratchDirURIString(), TEZ_DIR);
+ SessionState sessionState = SessionState.get();
+ String hdfsScratchDir = sessionState == null ? HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIR) : sessionState
+ .getHdfsScratchDirURIString();
+ Path tezDir = new Path(hdfsScratchDir, TEZ_DIR);
tezDir = new Path(tezDir, sessionId + ((suffix == null) ? "" : ("-" + suffix)));
FileSystem fs = tezDir.getFileSystem(conf);
FsPermission fsPermission = new FsPermission(HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIRPERMISSION));
http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java
index bc438bb..1b7321b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java
@@ -26,6 +26,7 @@ import org.apache.commons.lang.ArrayUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.llap.registry.LlapServiceInstance;
+import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet;
import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.split.SplitLocationProvider;
@@ -52,7 +53,7 @@ public class Utils {
LOG.info("Using LLAP instance " + serviceRegistry.getApplicationId());
Collection<LlapServiceInstance> serviceInstances =
- serviceRegistry.getInstances().getAllInstancesOrdered(true);
+ serviceRegistry.getInstances().getAllInstancesOrdered(true);
Preconditions.checkArgument(!serviceInstances.isEmpty(),
"No running LLAP daemons! Please check LLAP service status and zookeeper configuration");
ArrayList<String> locations = new ArrayList<>(serviceInstances.size());
http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java
index a8d729d..0d1990a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hive.llap.registry.LlapServiceInstance;
import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet;
import org.apache.hadoop.hive.llap.registry.impl.InactiveServiceInstance;
import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
+import org.apache.hadoop.hive.registry.ServiceInstanceSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -112,7 +113,7 @@ public class LlapClusterStateForCompile {
return false; // Don't fail; this is best-effort.
}
}
- LlapServiceInstanceSet instances;
+ ServiceInstanceSet<LlapServiceInstance> instances;
try {
instances = svc.getInstances(10);
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
index d261623..d5b683f 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
@@ -18,11 +18,14 @@
package org.apache.hadoop.hive.ql.exec.tez;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.Random;
+
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.junit.Before;
@@ -90,7 +93,7 @@ public class TestTezSessionPool {
poolManager = new TestTezSessionPoolManager();
poolManager.setupPool(conf);
- poolManager.startPool();
+ poolManager.startPool(conf, null);
// this is now a LIFO operation
// draw 1 and replace
@@ -153,7 +156,7 @@ public class TestTezSessionPool {
poolManager = new TestTezSessionPoolManager();
poolManager.setupPool(conf);
- poolManager.startPool();
+ poolManager.startPool(conf, null);
TezSessionState[] sessions = new TezSessionState[12];
int[] queueCounts = new int[3];
for (int i = 0; i < sessions.length; ++i) {
@@ -234,7 +237,7 @@ public class TestTezSessionPool {
conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LLAP_CONCURRENT_QUERIES, 2);
poolManager = new TestTezSessionPoolManager();
poolManager.setupPool(conf);
- poolManager.startPool();
+ poolManager.startPool(conf, null);
} catch (Exception e) {
LOG.error("Initialization error", e);
fail();
@@ -295,7 +298,7 @@ public class TestTezSessionPool {
try {
poolManager = new TestTezSessionPoolManager();
poolManager.setupPool(conf);
- poolManager.startPool();
+ poolManager.startPool(conf, null);
} catch (Exception e) {
e.printStackTrace();
fail();
http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/service/pom.xml
----------------------------------------------------------------------
diff --git a/service/pom.xml b/service/pom.xml
index 9ad7555..e3774df 100644
--- a/service/pom.xml
+++ b/service/pom.xml
@@ -264,7 +264,48 @@
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
-
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-api</artifactId>
+ <version>${tez.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-runtime-library</artifactId>
+ <version>${tez.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commmons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-dag</artifactId>
+ <version>${tez.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commmons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-server-web-proxy</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>