You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2018/03/16 06:01:40 UTC
[1/2] hive git commit: HIVE-18281: HiveServer2 HA for LLAP and
Workload Manager (Prasanth Jayachandran reviewed by Sergey Shelukhin)
Repository: hive
Updated Branches:
refs/heads/master 300433537 -> 21c6a5407
http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistry.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistry.java b/service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistry.java
new file mode 100644
index 0000000..6514d98
--- /dev/null
+++ b/service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistry.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.server;
+
+import static org.apache.hive.service.server.HiveServer2.INSTANCE_URI_CONFIG;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.framework.recipes.leader.LeaderLatch;
+import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.registry.ServiceRegistry;
+import org.apache.hadoop.hive.registry.ServiceInstanceSet;
+import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener;
+import org.apache.hadoop.hive.registry.impl.ZkRegistryBase;
+import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
+import org.apache.hadoop.registry.client.binding.RegistryUtils;
+import org.apache.hadoop.registry.client.types.Endpoint;
+import org.apache.hadoop.registry.client.types.ServiceRecord;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hive.service.ServiceException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+public class HS2ActivePassiveHARegistry extends ZkRegistryBase<HiveServer2Instance> implements
+ ServiceRegistry<HiveServer2Instance>, HiveServer2HAInstanceSet {
+ private static final Logger LOG = LoggerFactory.getLogger(HS2ActivePassiveHARegistry.class);
+ static final String ACTIVE_ENDPOINT = "activeEndpoint";
+ static final String PASSIVE_ENDPOINT = "passiveEndpoint";
+ private static final String SASL_LOGIN_CONTEXT_NAME = "HS2ActivePassiveHAZooKeeperClient";
+ private static final String INSTANCE_PREFIX = "instance-";
+ private static final String INSTANCE_GROUP = "instances";
+ private static final String LEADER_LATCH_PATH = "/_LEADER";
+ private LeaderLatch leaderLatch;
+ private ServiceRecord srv;
+ private boolean isClient;
+
+ // There are 2 paths under which the instances get registered
+ // 1) Standard path used by ZkRegistryBase where all instances register themselves (also stores metadata)
+ // Secure: /hs2ActivePassiveHA-sasl/instances/instance-0000000000
+ // Unsecure: /hs2ActivePassiveHA-unsecure/instances/instance-0000000000
+ // 2) Leader latch path used for HS2 HA Active/Passive configuration where all instances register under _LEADER
+ // path but only one among them is the leader
+ // Secure: /hs2ActivePassiveHA-sasl/_LEADER/xxxx-latch-0000000000
+ // Unsecure: /hs2ActivePassiveHA-unsecure/_LEADER/xxxx-latch-0000000000
+ static HS2ActivePassiveHARegistry create(Configuration conf, boolean isClient) {
+ String zkNameSpace = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_SERVER2_ACTIVE_PASSIVE_HA_REGISTRY_NAMESPACE);
+ Preconditions.checkArgument(!StringUtils.isBlank(zkNameSpace),
+ HiveConf.ConfVars.HIVE_SERVER2_ACTIVE_PASSIVE_HA_REGISTRY_NAMESPACE.varname + " cannot be null or empty");
+ String principal = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL);
+ String keytab = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB);
+ String zkNameSpacePrefix = zkNameSpace + "-";
+ return new HS2ActivePassiveHARegistry(null, zkNameSpacePrefix, LEADER_LATCH_PATH, principal, keytab,
+ SASL_LOGIN_CONTEXT_NAME, conf, isClient);
+ }
+
+ private HS2ActivePassiveHARegistry(final String instanceName, final String zkNamespacePrefix,
+ final String leaderLatchPath,
+ final String krbPrincipal, final String krbKeytab, final String saslContextName, final Configuration conf,
+ final boolean isClient) {
+ super(instanceName, conf, null, zkNamespacePrefix, null, INSTANCE_PREFIX, INSTANCE_GROUP,
+ saslContextName, krbPrincipal, krbKeytab, null);
+ this.isClient = isClient;
+ leaderLatch = new LeaderLatch(zooKeeperClient, leaderLatchPath, UNIQUE_ID.toString(),
+ LeaderLatch.CloseMode.NOTIFY_LEADER);
+ }
+
+ @Override
+ public void start() throws IOException {
+ super.start();
+ if (!isClient) {
+ this.srv = getNewServiceRecord();
+ register();
+ leaderLatch.addListener(new HS2LeaderLatchListener());
+ try {
+ // all participating instances uses the same latch path, and curator randomly chooses one instance to be leader
+ // which can be verified via leaderLatch.hasLeadership()
+ leaderLatch.start();
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ LOG.info("Registered HS2 with ZK. service record: {}", srv);
+ } else {
+ populateCache();
+ }
+ }
+
+ @Override
+ protected void unregisterInternal() {
+ super.unregisterInternal();
+ }
+
+ @Override
+ public String register() throws IOException {
+ updateEndpoint(srv, PASSIVE_ENDPOINT);
+ return registerServiceRecord(srv);
+ }
+
+ @Override
+ public void unregister() {
+ CloseableUtils.closeQuietly(leaderLatch);
+ unregisterInternal();
+ }
+
+ private void populateCache() throws IOException {
+ PathChildrenCache pcc = ensureInstancesCache(0);
+ populateCache(pcc, false);
+ }
+
+ @Override
+ public ServiceInstanceSet<HiveServer2Instance> getInstances(final String component, final long clusterReadyTimeoutMs)
+ throws IOException {
+ throw new IOException("Not supported to get instances by component name");
+ }
+
+ private void addActiveEndpointToServiceRecord() throws IOException {
+ addEndpointToServiceRecord(getNewServiceRecord(), ACTIVE_ENDPOINT);
+ }
+
+ private void addPassiveEndpointToServiceRecord() throws IOException {
+ addEndpointToServiceRecord(getNewServiceRecord(), PASSIVE_ENDPOINT);
+ }
+
+ private void addEndpointToServiceRecord(final ServiceRecord srv, final String endpointName) throws IOException {
+ updateEndpoint(srv, endpointName);
+ updateServiceRecord(srv);
+ }
+
+ private void updateEndpoint(final ServiceRecord srv, final String endpointName) {
+ final String instanceUri = srv.get(INSTANCE_URI_CONFIG);
+ final String[] tokens = instanceUri.split(":");
+ final String hostname = tokens[0];
+ final int port = Integer.parseInt(tokens[1]);
+ Endpoint urlEndpoint = RegistryTypeUtils.ipcEndpoint(endpointName, new InetSocketAddress(hostname, port));
+ srv.addInternalEndpoint(urlEndpoint);
+ LOG.info("Added {} endpoint to service record", urlEndpoint);
+ }
+
+ @Override
+ public void stop() {
+ CloseableUtils.closeQuietly(leaderLatch);
+ super.stop();
+ }
+
+ @Override
+ protected HiveServer2Instance createServiceInstance(final ServiceRecord srv) throws IOException {
+ Endpoint activeEndpoint = srv.getInternalEndpoint(HS2ActivePassiveHARegistry.ACTIVE_ENDPOINT);
+ return new HiveServer2Instance(srv, activeEndpoint != null ? ACTIVE_ENDPOINT : PASSIVE_ENDPOINT);
+ }
+
+ @Override
+ public synchronized void registerStateChangeListener(
+ final ServiceInstanceStateChangeListener<HiveServer2Instance> listener)
+ throws IOException {
+ super.registerStateChangeListener(listener);
+ }
+
+ @Override
+ public ApplicationId getApplicationId() throws IOException {
+ throw new IOException("Not supported until HS2 runs as YARN application");
+ }
+
+ @Override
+ protected String getZkPathUser(final Configuration conf) {
+ return RegistryUtils.currentUser();
+ }
+
+ private boolean hasLeadership() {
+ return leaderLatch.hasLeadership();
+ }
+
+ private class HS2LeaderLatchListener implements LeaderLatchListener {
+
+ // leadership state changes and sending out notifications to listener happens inside synchronous method in curator.
+ // Do only lightweight actions in main-event handler thread. Time consuming operations are handled via separate
+ // executor service registered via registerLeaderLatchListener().
+ @Override
+ public void isLeader() {
+ // only leader publishes instance uri as endpoint which will be used by clients to make connections to HS2 via
+ // service discovery.
+ try {
+ if (!hasLeadership()) {
+ LOG.info("isLeader notification received but hasLeadership returned false.. awaiting..");
+ leaderLatch.await();
+ }
+ addActiveEndpointToServiceRecord();
+ LOG.info("HS2 instance in ACTIVE mode. Service record: {}", srv);
+ } catch (Exception e) {
+ throw new ServiceException("Unable to add active endpoint to service record", e);
+ }
+ }
+
+ @Override
+ public void notLeader() {
+ try {
+ if (hasLeadership()) {
+ LOG.info("notLeader notification received but hasLeadership returned true.. awaiting..");
+ leaderLatch.await();
+ }
+ addPassiveEndpointToServiceRecord();
+ LOG.info("HS2 instance lost leadership. Switched to PASSIVE standby mode. Service record: {}", srv);
+ } catch (Exception e) {
+ throw new ServiceException("Unable to add passive endpoint to service record", e);
+ }
+ }
+ }
+
+ @Override
+ public HiveServer2Instance getLeader() {
+ for (HiveServer2Instance hs2Instance : getAll()) {
+ if (hs2Instance.isLeader()) {
+ return hs2Instance;
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public Collection<HiveServer2Instance> getAll() {
+ return getAllInternal();
+ }
+
+ @Override
+ public HiveServer2Instance getInstance(final String instanceId) {
+ for (HiveServer2Instance hs2Instance : getAll()) {
+ if (hs2Instance.getWorkerIdentity().equals(instanceId)) {
+ return hs2Instance;
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public Set<HiveServer2Instance> getByHost(final String host) {
+ return getByHostInternal(host);
+ }
+
+ @Override
+ public int size() {
+ return sizeInternal();
+ }
+
+ /**
+ * If leadership related notifications is desired, use this method to register leader latch listener.
+ *
+ * @param latchListener - listener
+ * @param executorService - event handler executor service
+ */
+ void registerLeaderLatchListener(final LeaderLatchListener latchListener, final ExecutorService executorService) {
+ leaderLatch.addListener(latchListener, executorService);
+ }
+
+ private Map<String, String> getConfsToPublish() {
+ final Map<String, String> confsToPublish = new HashMap<>();
+ // Hostname
+ confsToPublish.put(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST.varname,
+ conf.get(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST.varname));
+ // Hostname:port
+ confsToPublish.put(INSTANCE_URI_CONFIG, conf.get(INSTANCE_URI_CONFIG));
+ confsToPublish.put(UNIQUE_IDENTIFIER, UNIQUE_ID.toString());
+ // Transport mode
+ confsToPublish.put(HiveConf.ConfVars.HIVE_SERVER2_TRANSPORT_MODE.varname,
+ conf.get(HiveConf.ConfVars.HIVE_SERVER2_TRANSPORT_MODE.varname));
+ // Transport specific confs
+ if (HiveServer2.isHTTPTransportMode(conf)) {
+ confsToPublish.put(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT.varname,
+ conf.get(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT.varname));
+ confsToPublish.put(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH.varname,
+ conf.get(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH.varname));
+ } else {
+ confsToPublish.put(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_PORT.varname,
+ conf.get(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_PORT.varname));
+ confsToPublish.put(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_SASL_QOP.varname,
+ conf.get(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_SASL_QOP.varname));
+ }
+ // Auth specific confs
+ confsToPublish.put(HiveConf.ConfVars.HIVE_SERVER2_AUTHENTICATION.varname,
+ conf.get(HiveConf.ConfVars.HIVE_SERVER2_AUTHENTICATION.varname));
+ if (HiveServer2.isKerberosAuthMode(conf)) {
+ confsToPublish.put(HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL.varname,
+ conf.get(HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL.varname));
+ }
+ // SSL conf
+ confsToPublish.put(HiveConf.ConfVars.HIVE_SERVER2_USE_SSL.varname,
+ conf.get(HiveConf.ConfVars.HIVE_SERVER2_USE_SSL.varname));
+ return confsToPublish;
+ }
+
+ private ServiceRecord getNewServiceRecord() {
+ ServiceRecord srv = new ServiceRecord();
+ final Map<String, String> confsToPublish = getConfsToPublish();
+ for (Map.Entry<String, String> entry : confsToPublish.entrySet()) {
+ srv.set(entry.getKey(), entry.getValue());
+ }
+ return srv;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistryClient.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistryClient.java b/service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistryClient.java
new file mode 100644
index 0000000..512d4e8
--- /dev/null
+++ b/service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistryClient.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package org.apache.hive.service.server;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.registry.impl.ZkRegistryBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+public class HS2ActivePassiveHARegistryClient {
+ private static final Logger LOG = LoggerFactory.getLogger(HS2ActivePassiveHARegistryClient.class);
+ private static final Map<String, HS2ActivePassiveHARegistry> hs2Registries = new HashMap<>();
+
+ /**
+ * Helper method to get a HiveServer2HARegistry instance to read from the registry. Only used by clients (JDBC),
+ * service discovery to connect to active HS2 instance in Active/Passive HA configuration.
+ *
+ * @param conf {@link Configuration} instance which contains service registry information.
+ * @return HiveServer2HARegistry
+ */
+ public static synchronized HS2ActivePassiveHARegistry getClient(Configuration conf) throws IOException {
+ String namespace = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_SERVER2_ACTIVE_PASSIVE_HA_REGISTRY_NAMESPACE);
+ Preconditions.checkArgument(!StringUtils.isBlank(namespace),
+ HiveConf.ConfVars.HIVE_SERVER2_ACTIVE_PASSIVE_HA_REGISTRY_NAMESPACE.varname + " cannot be null or empty");
+ String nsKey = ZkRegistryBase.getRootNamespace(null, namespace + "-");
+ HS2ActivePassiveHARegistry registry = hs2Registries.get(nsKey);
+ if (registry == null) {
+ registry = HS2ActivePassiveHARegistry.create(conf, true);
+ registry.start();
+ hs2Registries.put(nsKey, registry);
+ } else {
+ LOG.debug("Returning cached registry client for nsKey: {}", nsKey);
+ }
+ return registry;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/service/src/java/org/apache/hive/service/server/HiveServer2.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java
index b7ece2b..5b792ac 100644
--- a/service/src/java/org/apache/hive/service/server/HiveServer2.java
+++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java
@@ -27,10 +27,11 @@ import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
@@ -38,6 +39,7 @@ import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
+import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -45,8 +47,10 @@ import org.apache.curator.framework.api.ACLProvider;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorEventType;
+import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode;
import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.JvmPauseMonitor;
import org.apache.hadoop.hive.common.LogUtils;
import org.apache.hadoop.hive.common.LogUtils.LogInitializationException;
@@ -69,6 +73,7 @@ import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.HiveMaterializedViewsRegistry;
import org.apache.hadoop.hive.ql.session.ClearDanglingScratchDir;
+import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.util.ZooKeeperHiveHelper;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.hive.shims.Utils;
@@ -86,6 +91,8 @@ import org.apache.hive.service.cli.CLIService;
import org.apache.hive.service.cli.thrift.ThriftBinaryCLIService;
import org.apache.hive.service.cli.thrift.ThriftCLIService;
import org.apache.hive.service.cli.thrift.ThriftHttpCLIService;
+import org.apache.hive.service.servlet.HS2LeadershipStatus;
+import org.apache.hive.service.servlet.HS2Peers;
import org.apache.hive.service.servlet.QueryProfileServlet;
import org.apache.logging.log4j.util.Strings;
import org.apache.zookeeper.CreateMode;
@@ -101,6 +108,7 @@ import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* HiveServer2.
@@ -109,16 +117,27 @@ import com.google.common.collect.Lists;
public class HiveServer2 extends CompositeService {
private static CountDownLatch deleteSignal;
private static final Logger LOG = LoggerFactory.getLogger(HiveServer2.class);
+ public static final String INSTANCE_URI_CONFIG = "hive.server2.instance.uri";
+ private static final int SHUTDOWN_TIME = 60;
private CLIService cliService;
private ThriftCLIService thriftCLIService;
private PersistentEphemeralNode znode;
- private String znodePath;
private CuratorFramework zooKeeperClient;
private boolean deregisteredWithZooKeeper = false; // Set to true only when deregistration happens
private HttpServer webServer; // Web UI
private TezSessionPoolManager tezSessionPoolManager;
private WorkloadManager wm;
private PamAuthenticator pamAuthenticator;
+ private Map<String, String> confsToPublish = new HashMap<String, String>();
+ private String serviceUri;
+ private boolean serviceDiscovery;
+ private boolean activePassiveHA;
+ private LeaderLatchListener leaderLatchListener;
+ private ExecutorService leaderActionsExecutorService;
+ private HS2ActivePassiveHARegistry hs2HARegistry;
+ private Hive sessionHive;
+ private String wmQueue;
+ private AtomicBoolean isLeader = new AtomicBoolean(false);
public HiveServer2() {
super(HiveServer2.class.getSimpleName());
@@ -140,13 +159,6 @@ public class HiveServer2 extends CompositeService {
if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_METRICS_ENABLED)) {
MetricsFactory.init(hiveConf);
}
-
- // will be invoked anyway in TezTask. Doing it early to initialize triggers for non-pool tez session.
- tezSessionPoolManager = TezSessionPoolManager.getInstance();
- tezSessionPoolManager.initTriggers(hiveConf);
- if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS)) {
- tezSessionPoolManager.setupPool(hiveConf);
- }
} catch (Throwable t) {
LOG.warn("Could not initiate the HiveServer2 Metrics system. Metrics may not be reported.", t);
}
@@ -191,15 +203,12 @@ public class HiveServer2 extends CompositeService {
LlapRegistryService.getClient(hiveConf);
}
- Hive sessionHive = null;
try {
sessionHive = Hive.get(hiveConf);
} catch (HiveException e) {
throw new RuntimeException("Failed to get metastore connection", e);
}
- initializeWorkloadManagement(hiveConf, sessionHive);
-
// Create views registry
HiveMaterializedViewsRegistry.get().init();
@@ -212,10 +221,17 @@ public class HiveServer2 extends CompositeService {
}
}
+ wmQueue = hiveConf.get(ConfVars.HIVE_SERVER2_TEZ_INTERACTIVE_QUEUE.varname, "").trim();
+
+ this.serviceDiscovery = hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY);
+ this.activePassiveHA = hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_ACTIVE_PASSIVE_HA_ENABLE);
+
// Setup web UI
+ final int webUIPort;
+ final String webHost;
try {
- int webUIPort =
- hiveConf.getIntVar(ConfVars.HIVE_SERVER2_WEBUI_PORT);
+ webUIPort = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_WEBUI_PORT);
+ webHost = hiveConf.getVar(ConfVars.HIVE_SERVER2_WEBUI_BIND_HOST);
// We disable web UI in tests unless the test is explicitly setting a
// unique web ui port so that we don't mess up ptests.
boolean uiDisabledInTest = hiveConf.getBoolVar(ConfVars.HIVE_IN_TEST) &&
@@ -229,7 +245,7 @@ public class HiveServer2 extends CompositeService {
LOG.info("Starting Web UI on port "+ webUIPort);
HttpServer.Builder builder = new HttpServer.Builder("hiveserver2");
builder.setPort(webUIPort).setConf(hiveConf);
- builder.setHost(hiveConf.getVar(ConfVars.HIVE_SERVER2_WEBUI_BIND_HOST));
+ builder.setHost(webHost);
builder.setMaxThreads(
hiveConf.getIntVar(ConfVars.HIVE_SERVER2_WEBUI_MAX_THREADS));
builder.setAdmins(hiveConf.getVar(ConfVars.USERS_IN_ADMIN_ROLE));
@@ -281,6 +297,12 @@ public class HiveServer2 extends CompositeService {
throw new IllegalArgumentException(ConfVars.HIVE_SERVER2_WEBUI_USE_SSL.varname + " has false value. It is recommended to set to true when PAM is used.");
}
}
+ if (serviceDiscovery && activePassiveHA) {
+ builder.setContextAttribute("hs2.isLeader", isLeader);
+ builder.setContextAttribute("hiveconf", hiveConf);
+ builder.addServlet("leader", HS2LeadershipStatus.class);
+ builder.addServlet("peers", HS2Peers.class);
+ }
builder.addServlet("llap", LlapServlet.class);
builder.addServlet("jdbcjar", JdbcJarDownloadServlet.class);
builder.setContextRootRewriteTarget("/hiveserver2.jsp");
@@ -292,57 +314,25 @@ public class HiveServer2 extends CompositeService {
} catch (IOException ie) {
throw new ServiceException(ie);
}
- // Add a shutdown hook for catching SIGTERM & SIGINT
- ShutdownHookManager.addShutdownHook(new Runnable() {
- @Override
- public void run() {
- hiveServer2.stop();
- }
- });
- }
- private void initializeWorkloadManagement(HiveConf hiveConf, Hive sessionHive) {
- String wmQueue = HiveConf.getVar(hiveConf, ConfVars.HIVE_SERVER2_TEZ_INTERACTIVE_QUEUE);
- boolean hasQueue = wmQueue != null && !wmQueue.isEmpty();
- WMFullResourcePlan resourcePlan;
try {
- resourcePlan = sessionHive.getActiveResourcePlan();
- } catch (Throwable e) {
- if (!HiveConf.getBoolVar(hiveConf, ConfVars.HIVE_IN_TEST_SSL)) {
- throw new RuntimeException(e);
- } else {
- resourcePlan = null; // Ignore errors in SSL tests where the connection is misconfigured.
- }
- }
- if (hasQueue && resourcePlan == null
- && HiveConf.getBoolVar(hiveConf, ConfVars.HIVE_IN_TEST)) {
- LOG.info("Creating a default resource plan for test");
- resourcePlan = createTestResourcePlan();
- }
- if (resourcePlan == null) {
- if (!hasQueue) {
- LOG.info("Workload management is not enabled and there's no resource plan");
- return; // TODO: we could activate it anyway, similar to the below; in case someone
- // wants to activate a resource plan for Tez triggers only w/o restart.
- }
- LOG.warn("Workload management is enabled but there's no resource plan");
- }
-
- if (hasQueue) {
- // Initialize workload management.
- LOG.info("Initializing workload management");
- try {
- wm = WorkloadManager.create(wmQueue, hiveConf, resourcePlan);
- } catch (ExecutionException | InterruptedException e) {
- throw new ServiceException("Unable to instantiate Workload Manager", e);
+ if (serviceDiscovery) {
+ serviceUri = getServerInstanceURI();
+ addConfsToPublish(hiveConf, confsToPublish, serviceUri);
+ if (activePassiveHA) {
+ hiveConf.set(INSTANCE_URI_CONFIG, serviceUri);
+ leaderLatchListener = new HS2LeaderLatchListener(this, SessionState.get());
+ leaderActionsExecutorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true)
+ .setNameFormat("Leader Actions Handler Thread").build());
+ hs2HARegistry = HS2ActivePassiveHARegistry.create(hiveConf, false);
+ }
}
+ } catch (Exception e) {
+ throw new ServiceException(e);
}
- if (resourcePlan != null) {
- tezSessionPoolManager.updateTriggers(resourcePlan);
- LOG.info("Updated tez session pool manager with active resource plan: {}",
- resourcePlan.getPlan().getName());
- }
+ // Add a shutdown hook for catching SIGTERM & SIGINT
+ ShutdownHookManager.addShutdownHook(() -> hiveServer2.stop());
}
private WMFullResourcePlan createTestResourcePlan() {
@@ -356,10 +346,10 @@ public class HiveServer2 extends CompositeService {
return resourcePlan;
}
- public static boolean isHTTPTransportMode(HiveConf hiveConf) {
+ public static boolean isHTTPTransportMode(Configuration hiveConf) {
String transportMode = System.getenv("HIVE_SERVER2_TRANSPORT_MODE");
if (transportMode == null) {
- transportMode = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_TRANSPORT_MODE);
+ transportMode = hiveConf.get(ConfVars.HIVE_SERVER2_TRANSPORT_MODE.varname);
}
if (transportMode != null && (transportMode.equalsIgnoreCase("http"))) {
return true;
@@ -367,8 +357,8 @@ public class HiveServer2 extends CompositeService {
return false;
}
- public static boolean isKerberosAuthMode(HiveConf hiveConf) {
- String authMode = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_AUTHENTICATION);
+ public static boolean isKerberosAuthMode(Configuration hiveConf) {
+ String authMode = hiveConf.get(ConfVars.HIVE_SERVER2_AUTHENTICATION.varname);
if (authMode != null && (authMode.equalsIgnoreCase("KERBEROS"))) {
return true;
}
@@ -408,7 +398,7 @@ public class HiveServer2 extends CompositeService {
* @param hiveConf
* @throws Exception
*/
- private void addServerInstanceToZooKeeper(HiveConf hiveConf) throws Exception {
+ private void addServerInstanceToZooKeeper(HiveConf hiveConf, Map<String, String> confsToPublish) throws Exception {
String zooKeeperEnsemble = ZooKeeperHiveHelper.getQuorumServers(hiveConf);
String rootNamespace = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE);
String instanceURI = getServerInstanceURI();
@@ -449,8 +439,8 @@ public class HiveServer2 extends CompositeService {
if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_PUBLISH_CONFIGS)) {
// HiveServer2 configs that this instance will publish to ZooKeeper,
// so that the clients can read these and configure themselves properly.
- Map<String, String> confsToPublish = new HashMap<String, String>();
- addConfsToPublish(hiveConf, confsToPublish);
+
+ addConfsToPublish(hiveConf, confsToPublish, instanceURI);
// Publish configs for this instance as the data on the node
znodeData = Joiner.on(';').withKeyValueSeparator("=").join(confsToPublish);
} else {
@@ -467,7 +457,7 @@ public class HiveServer2 extends CompositeService {
throw new Exception("Max znode creation wait time: " + znodeCreationTimeout + "s exhausted");
}
setDeregisteredWithZooKeeper(false);
- znodePath = znode.getActualPath();
+ final String znodePath = znode.getActualPath();
// Set a watch on the znode
if (zooKeeperClient.checkExists().usingWatcher(new DeRegisterWatcher()).forPath(znodePath) == null) {
// No node exists, throw exception
@@ -487,10 +477,12 @@ public class HiveServer2 extends CompositeService {
* Add conf keys, values that HiveServer2 will publish to ZooKeeper.
* @param hiveConf
*/
- private void addConfsToPublish(HiveConf hiveConf, Map<String, String> confsToPublish) {
+ private void addConfsToPublish(HiveConf hiveConf, Map<String, String> confsToPublish, String serviceUri) {
// Hostname
confsToPublish.put(ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST.varname,
hiveConf.getVar(ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST));
+ // Hostname:port
+ confsToPublish.put(INSTANCE_URI_CONFIG, serviceUri);
// Transport mode
confsToPublish.put(ConfVars.HIVE_SERVER2_TRANSPORT_MODE.varname,
hiveConf.getVar(ConfVars.HIVE_SERVER2_TRANSPORT_MODE));
@@ -540,6 +532,11 @@ public class HiveServer2 extends CompositeService {
}
}
+ public boolean isLeader() {
+ return isLeader.get();
+ }
+
+
/**
* The watcher class which sets the de-register flag when the znode corresponding to this server
* instance is deleted. Additionally, it shuts down the server if there are no more active client
@@ -609,10 +606,16 @@ public class HiveServer2 extends CompositeService {
super.start();
// If we're supporting dynamic service discovery, we'll add the service uri for this
// HiveServer2 instance to Zookeeper as a znode.
- HiveConf hiveConf = this.getHiveConf();
- if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY)) {
+ HiveConf hiveConf = getHiveConf();
+ if (serviceDiscovery) {
try {
- addServerInstanceToZooKeeper(hiveConf);
+ if (activePassiveHA) {
+ hs2HARegistry.registerLeaderLatchListener(leaderLatchListener, leaderActionsExecutorService);
+ hs2HARegistry.start();
+ LOG.info("HS2 HA registry started");
+ } else {
+ addServerInstanceToZooKeeper(hiveConf, confsToPublish);
+ }
} catch (Exception e) {
LOG.error("Error adding this HiveServer2 instance to ZooKeeper: ", e);
throw new ServiceException(e);
@@ -627,22 +630,125 @@ public class HiveServer2 extends CompositeService {
throw new ServiceException(e);
}
}
+
+ if (!activePassiveHA) {
+ LOG.info("HS2 interactive HA not enabled. Starting tez sessions..");
+ startOrReconnectTezSessions();
+ } else {
+ LOG.info("HS2 interactive HA enabled. Tez sessions will be started/reconnected by the leader.");
+ }
+ }
+
+ private static class HS2LeaderLatchListener implements LeaderLatchListener {
+ private HiveServer2 hiveServer2;
+ private SessionState parentSession;
+
+ HS2LeaderLatchListener(final HiveServer2 hs2, final SessionState parentSession) {
+ this.hiveServer2 = hs2;
+ this.parentSession = parentSession;
+ }
+
+ // leadership status change happens inside synchronized methods LeaderLatch.setLeadership().
+ // Also we use single threaded executor service for handling notifications which guarantees ordering for
+ // notification handling. if a leadership status change happens when tez sessions are getting created,
+ // the notLeader notification will get queued in executor service.
+ @Override
+ public void isLeader() {
+ LOG.info("HS2 instance {} became the LEADER. Starting/Reconnecting tez sessions..", hiveServer2.serviceUri);
+ hiveServer2.isLeader.set(true);
+ if (parentSession != null) {
+ SessionState.setCurrentSessionState(parentSession);
+ }
+ hiveServer2.startOrReconnectTezSessions();
+ LOG.info("Started/Reconnected tez sessions.");
+ }
+
+ @Override
+ public void notLeader() {
+ LOG.info("HS2 instance {} LOST LEADERSHIP. Stopping/Disconnecting tez sessions..", hiveServer2.serviceUri);
+ hiveServer2.isLeader.set(false);
+ hiveServer2.stopOrDisconnectTezSessions();
+ LOG.info("Stopped/Disconnected tez sessions.");
+ }
+ }
+
+ private void startOrReconnectTezSessions() {
+ LOG.info("Starting/Reconnecting tez sessions..");
+ // TODO: add tez session reconnect after TEZ-3875
+ WMFullResourcePlan resourcePlan = null;
+ if (!StringUtils.isEmpty(wmQueue)) {
+ try {
+ resourcePlan = sessionHive.getActiveResourcePlan();
+ } catch (HiveException e) {
+ if (!HiveConf.getBoolVar(hiveConf, ConfVars.HIVE_IN_TEST_SSL)) {
+ throw new RuntimeException(e);
+ } else {
+ resourcePlan = null; // Ignore errors in SSL tests where the connection is misconfigured.
+ }
+ }
+
+ if (resourcePlan == null && HiveConf.getBoolVar(hiveConf, ConfVars.HIVE_IN_TEST)) {
+ LOG.info("Creating a default resource plan for test");
+ resourcePlan = createTestResourcePlan();
+ }
+ }
+ initAndStartTezSessionPoolManager(resourcePlan);
+ initAndStartWorkloadManager(resourcePlan);
+ }
+
+ private void initAndStartTezSessionPoolManager(final WMFullResourcePlan resourcePlan) {
+ // starting Tez session pool in start here to let parent session state initialize on CliService state, to avoid
+ // SessionState.get() return null during createTezDir
+ try {
+ // will be invoked anyway in TezTask. Doing it early to initialize triggers for non-pool tez session.
+ LOG.info("Initializing tez session pool manager");
+ tezSessionPoolManager = TezSessionPoolManager.getInstance();
+ if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS)) {
+ tezSessionPoolManager.setupPool(hiveConf);
+ }
+ tezSessionPoolManager.startPool(hiveConf, resourcePlan);
+ LOG.info("Tez session pool manager initialized.");
+ } catch (Exception e) {
+ throw new ServiceException("Unable to setup tez session pool", e);
+ }
+ }
+
+ private void initAndStartWorkloadManager(final WMFullResourcePlan resourcePlan) {
+ if (!StringUtils.isEmpty(wmQueue)) {
+ // Initialize workload management.
+ LOG.info("Initializing workload management");
+ try {
+ wm = WorkloadManager.create(wmQueue, hiveConf, resourcePlan);
+ wm.start();
+ LOG.info("Workload manager initialized.");
+ } catch (Exception e) {
+ throw new ServiceException("Unable to instantiate and start Workload Manager", e);
+ }
+ } else {
+ LOG.info("Workload management is not enabled.");
+ }
+ }
+
+ private void stopOrDisconnectTezSessions() {
+ LOG.info("Stoppping/Disconnecting tez sessions.");
+ // There should already be an instance of the session pool manager.
+ // If not, ignoring is fine while stopping HiveServer2.
if (tezSessionPoolManager != null) {
try {
- tezSessionPoolManager.startPool();
- LOG.info("Started tez session pool manager..");
+ tezSessionPoolManager.stop();
+ LOG.info("Stopped tez session pool manager.");
} catch (Exception e) {
- LOG.error("Error starting tez session pool manager: ", e);
- throw new ServiceException(e);
+ LOG.error("Tez session pool manager stop had an error during stop of HiveServer2. "
+ + "Shutting down HiveServer2 anyway.", e);
}
}
if (wm != null) {
try {
- wm.start();
- LOG.info("Started workload manager..");
+ wm.stop();
+ LOG.info("Stopped workload manager.");
} catch (Exception e) {
- LOG.error("Error starting workload manager", e);
- throw new ServiceException(e);
+ LOG.error("Workload manager stop had an error during stop of HiveServer2. "
+ + "Shutting down HiveServer2 anyway.", e);
}
}
}
@@ -652,6 +758,12 @@ public class HiveServer2 extends CompositeService {
LOG.info("Shutting down HiveServer2");
HiveConf hiveConf = this.getHiveConf();
super.stop();
+ if (hs2HARegistry != null) {
+ hs2HARegistry.stop();
+ shutdownExecutor(leaderActionsExecutorService);
+ LOG.info("HS2 HA registry stopped");
+ hs2HARegistry = null;
+ }
if (webServer != null) {
try {
webServer.stop();
@@ -670,32 +782,15 @@ public class HiveServer2 extends CompositeService {
}
}
// Remove this server instance from ZooKeeper if dynamic service discovery is set
- if (hiveConf != null && hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY)) {
+ if (serviceDiscovery && !activePassiveHA) {
try {
removeServerInstanceFromZooKeeper();
} catch (Exception e) {
LOG.error("Error removing znode for this HiveServer2 instance from ZooKeeper.", e);
}
}
- // There should already be an instance of the session pool manager.
- // If not, ignoring is fine while stopping HiveServer2.
- if (hiveConf != null && hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS) &&
- tezSessionPoolManager != null) {
- try {
- tezSessionPoolManager.stop();
- } catch (Exception e) {
- LOG.error("Tez session pool manager stop had an error during stop of HiveServer2. "
- + "Shutting down HiveServer2 anyway.", e);
- }
- }
- if (wm != null) {
- try {
- wm.stop();
- } catch (Exception e) {
- LOG.error("Workload manager stop had an error during stop of HiveServer2. "
- + "Shutting down HiveServer2 anyway.", e);
- }
- }
+
+ stopOrDisconnectTezSessions();
if (hiveConf != null && hiveConf.getVar(ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) {
try {
@@ -706,6 +801,22 @@ public class HiveServer2 extends CompositeService {
}
}
+ private void shutdownExecutor(final ExecutorService leaderActionsExecutorService) {
+ leaderActionsExecutorService.shutdown();
+ try {
+ if (!leaderActionsExecutorService.awaitTermination(SHUTDOWN_TIME, TimeUnit.SECONDS)) {
+ LOG.warn("Executor service did not terminate in the specified time {} sec", SHUTDOWN_TIME);
+ List<Runnable> droppedTasks = leaderActionsExecutorService.shutdownNow();
+ LOG.warn("Executor service was abruptly shut down. " + droppedTasks.size() + " tasks will not be executed.");
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("Executor service did not terminate in the specified time {} sec. Exception: {}", SHUTDOWN_TIME,
+ e.getMessage());
+ List<Runnable> droppedTasks = leaderActionsExecutorService.shutdownNow();
+ LOG.warn("Executor service was abruptly shut down. " + droppedTasks.size() + " tasks will not be executed.");
+ }
+ }
+
@VisibleForTesting
public static void scheduleClearDanglingScratchDir(HiveConf hiveConf, int initialWaitInSec) {
if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_CLEAR_DANGLING_SCRATCH_DIR)) {
http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/service/src/java/org/apache/hive/service/server/HiveServer2HAInstanceSet.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2HAInstanceSet.java b/service/src/java/org/apache/hive/service/server/HiveServer2HAInstanceSet.java
new file mode 100644
index 0000000..b31d63c
--- /dev/null
+++ b/service/src/java/org/apache/hive/service/server/HiveServer2HAInstanceSet.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.server;
+
+import org.apache.hadoop.hive.registry.ServiceInstanceSet;
+
+public interface HiveServer2HAInstanceSet extends ServiceInstanceSet<HiveServer2Instance> {
+
+ /**
+ * In Active/Passive setup, returns current active leader.
+ *
+ * @return leader instance
+ */
+ HiveServer2Instance getLeader();
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/service/src/java/org/apache/hive/service/server/HiveServer2Instance.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2Instance.java b/service/src/java/org/apache/hive/service/server/HiveServer2Instance.java
new file mode 100644
index 0000000..558e809
--- /dev/null
+++ b/service/src/java/org/apache/hive/service/server/HiveServer2Instance.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.server;
+
+import java.io.IOException;
+import java.util.Objects;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.registry.impl.ServiceInstanceBase;
+import org.apache.hadoop.registry.client.types.Endpoint;
+import org.apache.hadoop.registry.client.types.ServiceRecord;
+
+import com.google.common.base.Preconditions;
+
+public class HiveServer2Instance extends ServiceInstanceBase {
+ private boolean isLeader;
+ private String transportMode;
+ private String httpEndpoint;
+
+ // empty c'tor to make jackson happy
+ public HiveServer2Instance() {
+
+ }
+
+ public HiveServer2Instance(final ServiceRecord srv, final String endPointName) throws IOException {
+ super(srv, endPointName);
+
+ Endpoint activeEndpoint = srv.getInternalEndpoint(HS2ActivePassiveHARegistry.ACTIVE_ENDPOINT);
+ Endpoint passiveEndpoint = srv.getInternalEndpoint(HS2ActivePassiveHARegistry.PASSIVE_ENDPOINT);
+ this.isLeader = activeEndpoint != null;
+ Preconditions.checkArgument(activeEndpoint == null || passiveEndpoint == null,
+ "Incorrect service record. Both active and passive endpoints cannot be non-null!");
+ this.transportMode = srv.get(HiveConf.ConfVars.HIVE_SERVER2_TRANSPORT_MODE.varname);
+ if (transportMode.equalsIgnoreCase("http")) {
+ this.httpEndpoint = srv.get(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH.varname);
+ } else {
+ this.httpEndpoint = "";
+ }
+ }
+
+ public boolean isLeader() {
+ return isLeader;
+ }
+
+ public String getTransportMode() {
+ return transportMode;
+ }
+
+ public String getHttpEndpoint() {
+ return httpEndpoint;
+ }
+
+ public void setLeader(final boolean leader) {
+ isLeader = leader;
+ }
+
+ public void setTransportMode(final String transportMode) {
+ this.transportMode = transportMode;
+ }
+
+ public void setHttpEndpoint(final String httpEndpoint) {
+ this.httpEndpoint = httpEndpoint;
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ HiveServer2Instance other = (HiveServer2Instance) o;
+ return super.equals(o) && isLeader == other.isLeader
+ && Objects.equals(transportMode, other.transportMode)
+ && Objects.equals(httpEndpoint, other.httpEndpoint);
+ }
+
+ @Override
+ public int hashCode() {
+ return super.hashCode() + Objects.hashCode(isLeader) + Objects.hashCode(transportMode) + Objects.hashCode(httpEndpoint);
+ }
+
+ @Override
+ public String toString() {
+ String result = "instanceId: " + getWorkerIdentity() + " isLeader: " + isLeader + " host: " + getHost() +
+ " port: " + getRpcPort() + " transportMode: " + transportMode;
+ if (httpEndpoint != null) {
+ result += " httpEndpoint: " + httpEndpoint;
+ }
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/service/src/java/org/apache/hive/service/servlet/HS2LeadershipStatus.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/servlet/HS2LeadershipStatus.java b/service/src/java/org/apache/hive/service/servlet/HS2LeadershipStatus.java
new file mode 100644
index 0000000..921a23e
--- /dev/null
+++ b/service/src/java/org/apache/hive/service/servlet/HS2LeadershipStatus.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.service.servlet;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.servlet.ServletContext;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.codehaus.jackson.map.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Returns 200 if this HS2 instance is leader.
+ */
+public class HS2LeadershipStatus extends HttpServlet {
+ private static final Logger LOG = LoggerFactory.getLogger(HS2LeadershipStatus.class);
+
+ @Override
+ public void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException {
+ ServletContext ctx = getServletContext();
+ AtomicBoolean isLeader = (AtomicBoolean) ctx.getAttribute("hs2.isLeader");
+ LOG.info("Returning isLeader: {}", isLeader);
+ ObjectMapper mapper = new ObjectMapper();
+ mapper.writerWithDefaultPrettyPrinter().writeValue(response.getWriter(), isLeader);
+ response.setStatus(HttpServletResponse.SC_OK);
+ response.flushBuffer();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/service/src/java/org/apache/hive/service/servlet/HS2Peers.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/servlet/HS2Peers.java b/service/src/java/org/apache/hive/service/servlet/HS2Peers.java
new file mode 100644
index 0000000..a51bbeb
--- /dev/null
+++ b/service/src/java/org/apache/hive/service/servlet/HS2Peers.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.service.servlet;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import javax.servlet.ServletContext;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hive.service.server.HS2ActivePassiveHARegistry;
+import org.apache.hive.service.server.HS2ActivePassiveHARegistryClient;
+import org.apache.hive.service.server.HiveServer2Instance;
+import org.codehaus.jackson.annotate.JsonAutoDetect;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.SerializationConfig;
+
+/**
+ * Returns all HS2 instances in Active-Passive standy modes.
+ */
+public class HS2Peers extends HttpServlet {
+ public static class HS2Instances {
+ private Collection<HiveServer2Instance> hiveServer2Instances;
+
+ // empty c'tor to make jackson happy
+ public HS2Instances() {
+ }
+
+ public HS2Instances(final Collection<HiveServer2Instance> hiveServer2Instances) {
+ this.hiveServer2Instances = hiveServer2Instances;
+ }
+
+ public Collection<HiveServer2Instance> getHiveServer2Instances() {
+ return hiveServer2Instances;
+ }
+
+ public void setHiveServer2Instances(final Collection<HiveServer2Instance> hiveServer2Instances) {
+ this.hiveServer2Instances = hiveServer2Instances;
+ }
+ }
+
+ @Override
+ public void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException {
+ ServletContext ctx = getServletContext();
+ HiveConf hiveConf = (HiveConf) ctx.getAttribute("hiveconf");
+ ObjectMapper mapper = new ObjectMapper();
+ mapper.configure(SerializationConfig.Feature.FAIL_ON_EMPTY_BEANS, false);
+ // serialize json based on field annotations only
+ mapper.setVisibilityChecker(mapper.getSerializationConfig().getDefaultVisibilityChecker()
+ .withSetterVisibility(JsonAutoDetect.Visibility.NONE));
+ HS2ActivePassiveHARegistry hs2Registry = HS2ActivePassiveHARegistryClient.getClient(hiveConf);
+ HS2Instances instances = new HS2Instances(hs2Registry.getAll());
+ mapper.writerWithDefaultPrettyPrinter().writeValue(response.getWriter(), instances);
+ response.setStatus(HttpServletResponse.SC_OK);
+ response.flushBuffer();
+ }
+}
[2/2] hive git commit: HIVE-18281: HiveServer2 HA for LLAP and
Workload Manager (Prasanth Jayachandran reviewed by Sergey Shelukhin)
Posted by pr...@apache.org.
HIVE-18281: HiveServer2 HA for LLAP and Workload Manager (Prasanth Jayachandran reviewed by Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/21c6a540
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/21c6a540
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/21c6a540
Branch: refs/heads/master
Commit: 21c6a5407cebc5a096cd9aa10157be05a3ea9627
Parents: 3004335
Author: Prasanth Jayachandran <pr...@apache.org>
Authored: Thu Mar 15 23:01:25 2018 -0700
Committer: Prasanth Jayachandran <pr...@apache.org>
Committed: Thu Mar 15 23:01:25 2018 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 11 +
itests/hive-minikdc/pom.xml | 20 ++
itests/hive-unit-hadoop2/pom.xml | 21 +-
.../apache/hive/jdbc/TestActivePassiveHA.java | 268 +++++++++++++++
.../hive/jdbc/miniHS2/AbstractHiveService.java | 8 +-
.../org/apache/hive/jdbc/miniHS2/MiniHS2.java | 7 +
.../hive/llap/registry/ServiceRegistry.java | 11 +-
.../registry/impl/LlapFixedRegistryImpl.java | 4 +-
.../llap/registry/impl/LlapRegistryService.java | 5 +-
.../impl/LlapZookeeperRegistryImpl.java | 12 +-
.../hive/llap/security/LlapTokenClient.java | 3 +-
.../hadoop/hive/registry/RegistryUtilities.java | 52 +++
.../hadoop/hive/registry/ServiceInstance.java | 18 +-
.../hive/registry/ServiceInstanceSet.java | 12 +-
.../hive/registry/impl/ServiceInstanceBase.java | 57 +++-
.../hive/registry/impl/TezAmInstance.java | 21 +-
.../hive/registry/impl/TezAmRegistryImpl.java | 8 +-
.../hive/registry/impl/ZkRegistryBase.java | 242 +++++++++-----
.../hadoop/hive/llap/LlapBaseInputFormat.java | 5 +-
.../daemon/services/impl/LlapWebServices.java | 5 +-
.../tezplugins/LlapTaskSchedulerService.java | 10 +
.../hive/ql/exec/tez/TezSessionPoolManager.java | 9 +-
.../hive/ql/exec/tez/TezSessionPoolSession.java | 11 +-
.../hive/ql/exec/tez/TezSessionState.java | 5 +-
.../apache/hadoop/hive/ql/exec/tez/Utils.java | 3 +-
.../physical/LlapClusterStateForCompile.java | 3 +-
.../hive/ql/exec/tez/TestTezSessionPool.java | 13 +-
service/pom.xml | 43 ++-
.../server/HS2ActivePassiveHARegistry.java | 325 +++++++++++++++++++
.../HS2ActivePassiveHARegistryClient.java | 54 +++
.../apache/hive/service/server/HiveServer2.java | 313 ++++++++++++------
.../server/HiveServer2HAInstanceSet.java | 29 ++
.../service/server/HiveServer2Instance.java | 108 ++++++
.../service/servlet/HS2LeadershipStatus.java | 48 +++
.../apache/hive/service/servlet/HS2Peers.java | 75 +++++
35 files changed, 1559 insertions(+), 280 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 8bbf1be..06efd02 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1916,6 +1916,10 @@ public class HiveConf extends Configuration {
new TimeValidator(TimeUnit.MILLISECONDS),
"ZooKeeper client's session timeout (in milliseconds). The client is disconnected, and as a result, all locks released, \n" +
"if a heartbeat is not sent in the timeout."),
+ HIVE_ZOOKEEPER_CONNECTION_TIMEOUT("hive.zookeeper.connection.timeout", "15s",
+ new TimeValidator(TimeUnit.SECONDS),
+ "ZooKeeper client's connection timeout in seconds. Connection timeout * hive.zookeeper.connection.max.retries\n" +
+ "with exponential backoff is when curator client deems connection is lost to zookeeper."),
HIVE_ZOOKEEPER_NAMESPACE("hive.zookeeper.namespace", "hive_zookeeper_namespace",
"The parent node under which all ZooKeeper nodes are created."),
HIVE_ZOOKEEPER_CLEAN_EXTRA_NODES("hive.zookeeper.clean.extra.nodes", false,
@@ -2465,6 +2469,13 @@ public class HiveConf extends Configuration {
"If true, the HiveServer2 WebUI will be secured with PAM."),
// Tez session settings
+ HIVE_SERVER2_ACTIVE_PASSIVE_HA_ENABLE("hive.server2.active.passive.ha.enable", false,
+ "Whether HiveServer2 Active/Passive High Availability be enabled when Hive Interactive sessions are enabled." +
+ "This will also require hive.server2.support.dynamic.service.discovery to be enabled."),
+ HIVE_SERVER2_ACTIVE_PASSIVE_HA_REGISTRY_NAMESPACE("hive.server2.active.passive.ha.registry.namespace",
+ "hs2ActivePassiveHA",
+ "When HiveServer2 Active/Passive High Availability is enabled, uses this namespace for registering HS2\n" +
+ "instances with zookeeper"),
HIVE_SERVER2_TEZ_INTERACTIVE_QUEUE("hive.server2.tez.interactive.queue", "",
"A single YARN queues to use for Hive Interactive sessions. When this is specified,\n" +
"workload management is enabled and used for these sessions."),
http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/itests/hive-minikdc/pom.xml
----------------------------------------------------------------------
diff --git a/itests/hive-minikdc/pom.xml b/itests/hive-minikdc/pom.xml
index 337535a..1e40d9d 100644
--- a/itests/hive-minikdc/pom.xml
+++ b/itests/hive-minikdc/pom.xml
@@ -117,6 +117,26 @@
<scope>test</scope>
<classifier>tests</classifier>
</dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-dag</artifactId>
+ <version>${tez.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commmons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-server-web-proxy</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
<!-- test inter-project -->
<dependency>
<groupId>junit</groupId>
http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/itests/hive-unit-hadoop2/pom.xml
----------------------------------------------------------------------
diff --git a/itests/hive-unit-hadoop2/pom.xml b/itests/hive-unit-hadoop2/pom.xml
index fb31fd4..85a6145 100644
--- a/itests/hive-unit-hadoop2/pom.xml
+++ b/itests/hive-unit-hadoop2/pom.xml
@@ -53,7 +53,26 @@
<artifactId>hive-exec</artifactId>
<version>${project.version}</version>
</dependency>
-
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-dag</artifactId>
+ <version>${tez.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commmons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-server-web-proxy</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
<!-- dependencies are always listed in sorted order by groupId, artifectId -->
<!-- test intra-project -->
<dependency>
http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestActivePassiveHA.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestActivePassiveHA.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestActivePassiveHA.java
new file mode 100644
index 0000000..26acbd7
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestActivePassiveHA.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.jdbc;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.sql.Connection;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.curator.test.TestingServer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.registry.impl.ZkRegistryBase;
+import org.apache.hive.jdbc.miniHS2.MiniHS2;
+import org.apache.hive.service.server.HS2ActivePassiveHARegistry;
+import org.apache.hive.service.server.HS2ActivePassiveHARegistryClient;
+import org.apache.hive.service.server.HiveServer2Instance;
+import org.apache.hive.service.servlet.HS2Peers;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestActivePassiveHA {
+ private MiniHS2 miniHS2_1 = null;
+ private MiniHS2 miniHS2_2 = null;
+ private static TestingServer zkServer;
+ private Connection hs2Conn = null;
+ private HiveConf hiveConf1;
+ private HiveConf hiveConf2;
+
+ @BeforeClass
+ public static void beforeTest() throws Exception {
+ MiniHS2.cleanupLocalDir();
+ zkServer = new TestingServer();
+ Class.forName(MiniHS2.getJdbcDriverName());
+ }
+
+ @AfterClass
+ public static void afterTest() throws Exception {
+ if (zkServer != null) {
+ zkServer.close();
+ zkServer = null;
+ }
+ MiniHS2.cleanupLocalDir();
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ hiveConf1 = new HiveConf();
+ hiveConf1.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
+ // Set up zookeeper dynamic service discovery configs
+ setHAConfigs(hiveConf1);
+ miniHS2_1 = new MiniHS2.Builder().withConf(hiveConf1).cleanupLocalDirOnStartup(false).build();
+ hiveConf2 = new HiveConf();
+ hiveConf2.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
+ // Set up zookeeper dynamic service discovery configs
+ setHAConfigs(hiveConf2);
+ miniHS2_2 = new MiniHS2.Builder().withConf(hiveConf2).cleanupLocalDirOnStartup(false).build();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (hs2Conn != null) {
+ hs2Conn.close();
+ }
+ if ((miniHS2_1 != null) && miniHS2_1.isStarted()) {
+ miniHS2_1.stop();
+ }
+ if ((miniHS2_2 != null) && miniHS2_2.isStarted()) {
+ miniHS2_2.stop();
+ }
+ }
+
+ private static void setHAConfigs(Configuration conf) {
+ conf.setBoolean(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY.varname, true);
+ conf.set(ConfVars.HIVE_ZOOKEEPER_QUORUM.varname, zkServer.getConnectString());
+ final String zkRootNamespace = "hs2test";
+ conf.set(ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE.varname, zkRootNamespace);
+ conf.setBoolean(ConfVars.HIVE_SERVER2_ACTIVE_PASSIVE_HA_ENABLE.varname, true);
+ conf.setTimeDuration(ConfVars.HIVE_ZOOKEEPER_CONNECTION_TIMEOUT.varname, 2, TimeUnit.SECONDS);
+ conf.setTimeDuration(ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME.varname, 100, TimeUnit.MILLISECONDS);
+ conf.setInt(ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES.varname, 1);
+ }
+
+ @Test(timeout = 60000)
+ public void testActivePassive() throws Exception {
+ Map<String, String> confOverlay = new HashMap<>();
+ hiveConf1.set(ZkRegistryBase.UNIQUE_IDENTIFIER, UUID.randomUUID().toString());
+ miniHS2_1.start(confOverlay);
+ while(!miniHS2_1.isStarted()) {
+ Thread.sleep(100);
+ }
+
+ hiveConf2.set(ZkRegistryBase.UNIQUE_IDENTIFIER, UUID.randomUUID().toString());
+ miniHS2_2.start(confOverlay);
+ while(!miniHS2_2.isStarted()) {
+ Thread.sleep(100);
+ }
+
+ assertEquals(true, miniHS2_1.isLeader());
+ String url = "http://localhost:" + hiveConf1.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/leader";
+ assertEquals("true", sendGet(url));
+
+ assertEquals(false, miniHS2_2.isLeader());
+ url = "http://localhost:" + hiveConf2.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/leader";
+ assertEquals("false", sendGet(url));
+
+ url = "http://localhost:" + hiveConf1.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/peers";
+ String resp = sendGet(url);
+ ObjectMapper objectMapper = new ObjectMapper();
+ HS2Peers.HS2Instances hs2Peers = objectMapper.readValue(resp, HS2Peers.HS2Instances.class);
+ int port1 = Integer.parseInt(hiveConf1.get(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname));
+ assertEquals(2, hs2Peers.getHiveServer2Instances().size());
+ for (HiveServer2Instance hsi : hs2Peers.getHiveServer2Instances()) {
+ if (hsi.getRpcPort() == port1) {
+ assertEquals(true, hsi.isLeader());
+ } else {
+ assertEquals(false, hsi.isLeader());
+ }
+ }
+
+ Configuration conf = new Configuration();
+ setHAConfigs(conf);
+ HS2ActivePassiveHARegistry client = HS2ActivePassiveHARegistryClient.getClient(conf);
+ List<HiveServer2Instance> hs2Instances = new ArrayList<>(client.getAll());
+ assertEquals(2, hs2Instances.size());
+ List<HiveServer2Instance> leaders = new ArrayList<>();
+ List<HiveServer2Instance> standby = new ArrayList<>();
+ for (HiveServer2Instance instance : hs2Instances) {
+ if (instance.isLeader()) {
+ leaders.add(instance);
+ } else {
+ standby.add(instance);
+ }
+ }
+ assertEquals(1, leaders.size());
+ assertEquals(1, standby.size());
+
+ miniHS2_1.stop();
+
+ while(!miniHS2_2.isStarted()) {
+ Thread.sleep(100);
+ }
+ assertEquals(true, miniHS2_2.isLeader());
+ url = "http://localhost:" + hiveConf2.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/leader";
+ assertEquals("true", sendGet(url));
+
+ while (client.getAll().size() != 1) {
+ Thread.sleep(100);
+ }
+
+ client = HS2ActivePassiveHARegistryClient.getClient(conf);
+ hs2Instances = new ArrayList<>(client.getAll());
+ assertEquals(1, hs2Instances.size());
+ leaders = new ArrayList<>();
+ standby = new ArrayList<>();
+ for (HiveServer2Instance instance : hs2Instances) {
+ if (instance.isLeader()) {
+ leaders.add(instance);
+ } else {
+ standby.add(instance);
+ }
+ }
+ assertEquals(1, leaders.size());
+ assertEquals(0, standby.size());
+
+ url = "http://localhost:" + hiveConf2.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/peers";
+ resp = sendGet(url);
+ objectMapper = new ObjectMapper();
+ hs2Peers = objectMapper.readValue(resp, HS2Peers.HS2Instances.class);
+ int port2 = Integer.parseInt(hiveConf2.get(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname));
+ assertEquals(1, hs2Peers.getHiveServer2Instances().size());
+ for (HiveServer2Instance hsi : hs2Peers.getHiveServer2Instances()) {
+ if (hsi.getRpcPort() == port2) {
+ assertEquals(true, hsi.isLeader());
+ } else {
+ assertEquals(false, hsi.isLeader());
+ }
+ }
+
+ // start 1st server again
+ hiveConf1.set(ZkRegistryBase.UNIQUE_IDENTIFIER, UUID.randomUUID().toString());
+ miniHS2_1.start(confOverlay);
+
+ while(!miniHS2_1.isStarted()) {
+ Thread.sleep(100);
+ }
+ assertEquals(false, miniHS2_1.isLeader());
+ url = "http://localhost:" + hiveConf1.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/leader";
+ assertEquals("false", sendGet(url));
+
+ while (client.getAll().size() != 2) {
+ Thread.sleep(100);
+ }
+
+ client = HS2ActivePassiveHARegistryClient.getClient(conf);
+ hs2Instances = new ArrayList<>(client.getAll());
+ assertEquals(2, hs2Instances.size());
+ leaders = new ArrayList<>();
+ standby = new ArrayList<>();
+ for (HiveServer2Instance instance : hs2Instances) {
+ if (instance.isLeader()) {
+ leaders.add(instance);
+ } else {
+ standby.add(instance);
+ }
+ }
+ assertEquals(1, leaders.size());
+ assertEquals(1, standby.size());
+
+ url = "http://localhost:" + hiveConf1.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/peers";
+ resp = sendGet(url);
+ objectMapper = new ObjectMapper();
+ hs2Peers = objectMapper.readValue(resp, HS2Peers.HS2Instances.class);
+ port2 = Integer.parseInt(hiveConf2.get(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname));
+ assertEquals(2, hs2Peers.getHiveServer2Instances().size());
+ for (HiveServer2Instance hsi : hs2Peers.getHiveServer2Instances()) {
+ if (hsi.getRpcPort() == port2) {
+ assertEquals(true, hsi.isLeader());
+ } else {
+ assertEquals(false, hsi.isLeader());
+ }
+ }
+ }
+
+ private String sendGet(String url) throws Exception {
+ URL obj = new URL(url);
+ HttpURLConnection con = (HttpURLConnection) obj.openConnection();
+ con.setRequestMethod("GET");
+ BufferedReader in = new BufferedReader(new InputStreamReader(con.getInputStream()));
+ String inputLine;
+ StringBuilder response = new StringBuilder();
+ while ((inputLine = in.readLine()) != null) {
+ response.append(inputLine);
+ }
+ in.close();
+ return response.toString();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/AbstractHiveService.java
----------------------------------------------------------------------
diff --git a/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/AbstractHiveService.java b/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/AbstractHiveService.java
index 6cab8cd..d21b764 100644
--- a/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/AbstractHiveService.java
+++ b/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/AbstractHiveService.java
@@ -35,14 +35,16 @@ public abstract class AbstractHiveService {
private String hostname;
private int binaryPort;
private int httpPort;
+ private int webPort;
private boolean startedHiveService = false;
private List<String> addedProperties = new ArrayList<String>();
- public AbstractHiveService(HiveConf hiveConf, String hostname, int binaryPort, int httpPort) {
+ public AbstractHiveService(HiveConf hiveConf, String hostname, int binaryPort, int httpPort, int webPort) {
this.hiveConf = hiveConf;
this.hostname = hostname;
this.binaryPort = binaryPort;
this.httpPort = httpPort;
+ this.webPort = webPort;
}
/**
@@ -136,6 +138,10 @@ public abstract class AbstractHiveService {
return httpPort;
}
+ public int getWebPort() {
+ return webPort;
+ }
+
public boolean isStarted() {
return startedHiveService;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java
----------------------------------------------------------------------
diff --git a/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java b/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java
index 8bbf8a4..997726c 100644
--- a/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java
+++ b/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java
@@ -217,6 +217,8 @@ public class MiniHS2 extends AbstractHiveService {
(usePortsFromConf ? hiveConf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_PORT) : MetaStoreTestUtils
.findFreePort()),
(usePortsFromConf ? hiveConf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT) : MetaStoreTestUtils
+ .findFreePort()),
+ (usePortsFromConf ? hiveConf.getIntVar(ConfVars.HIVE_SERVER2_WEBUI_PORT) : MetaStoreTestUtils
.findFreePort()));
hiveConf.setLongVar(ConfVars.HIVE_SERVER2_MAX_START_ATTEMPTS, 3l);
hiveConf.setTimeVar(ConfVars.HIVE_SERVER2_SLEEP_INTERVAL_BETWEEN_START_ATTEMPTS, 10,
@@ -306,6 +308,7 @@ public class MiniHS2 extends AbstractHiveService {
hiveConf.setVar(ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST, getHost());
hiveConf.setIntVar(ConfVars.HIVE_SERVER2_THRIFT_PORT, getBinaryPort());
hiveConf.setIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT, getHttpPort());
+ hiveConf.setIntVar(ConfVars.HIVE_SERVER2_WEBUI_PORT, getWebPort());
Path scratchDir = new Path(baseFsDir, "scratch");
// Create root scratchdir with write all, so that user impersonation has no issues.
@@ -404,6 +407,10 @@ public class MiniHS2 extends AbstractHiveService {
}
+ public boolean isLeader() {
+ return hiveServer2.isLeader();
+ }
+
public CLIServiceClient getServiceClient() {
verifyStarted();
return getServiceClientInternal();
http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java
index 5d7f813..6178b4b 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java
@@ -14,13 +14,16 @@
package org.apache.hadoop.hive.llap.registry;
import java.io.IOException;
+
+import org.apache.hadoop.hive.registry.ServiceInstance;
+import org.apache.hadoop.hive.registry.ServiceInstanceSet;
import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener;
import org.apache.hadoop.yarn.api.records.ApplicationId;
/**
* ServiceRegistry interface for switching between fixed host and dynamic registry implementations.
*/
-public interface ServiceRegistry {
+public interface ServiceRegistry<T extends ServiceInstance> {
/**
* Start the service registry
@@ -49,14 +52,14 @@ public interface ServiceRegistry {
* @param clusterReadyTimeoutMs The time to wait for the cluster to be ready, if it's not
* started yet. 0 means do not wait.
*/
- LlapServiceInstanceSet getInstances(String component, long clusterReadyTimeoutMs) throws IOException;
+ ServiceInstanceSet<T> getInstances(String component, long clusterReadyTimeoutMs) throws
+ IOException;
/**
* Adds state change listeners for service instances.
* @param listener - state change listener
*/
- void registerStateChangeListener(
- ServiceInstanceStateChangeListener<LlapServiceInstance> listener) throws IOException;
+ void registerStateChangeListener(ServiceInstanceStateChangeListener<T> listener) throws IOException;
/**
* @return The application ID of the LLAP cluster.
http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
index c88198f..f99d86c 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.registry.LlapServiceInstance;
import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet;
import org.apache.hadoop.hive.llap.registry.ServiceRegistry;
+import org.apache.hadoop.hive.registry.ServiceInstance;
import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.StringUtils;
@@ -267,8 +268,7 @@ public class LlapFixedRegistryImpl implements ServiceRegistry {
}
@Override
- public void registerStateChangeListener(
- final ServiceInstanceStateChangeListener<LlapServiceInstance> listener) {
+ public void registerStateChangeListener(final ServiceInstanceStateChangeListener listener) throws IOException {
// nothing to set
LOG.warn("Callbacks for instance state changes are not supported in fixed registry.");
}
http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java
index 80a6aba..3bda40b 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet;
import org.apache.hadoop.hive.llap.registry.ServiceRegistry;
+import org.apache.hadoop.hive.registry.ServiceInstanceSet;
import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener;
import org.apache.hadoop.registry.client.binding.RegistryUtils;
import org.apache.hadoop.service.AbstractService;
@@ -35,7 +36,7 @@ public class LlapRegistryService extends AbstractService {
private static final Logger LOG = LoggerFactory.getLogger(LlapRegistryService.class);
- private ServiceRegistry registry = null;
+ private ServiceRegistry<LlapServiceInstance> registry = null;
private final boolean isDaemon;
private boolean isDynamic = false;
private String identity = "(pending)";
@@ -136,7 +137,7 @@ public class LlapRegistryService extends AbstractService {
}
public LlapServiceInstanceSet getInstances(long clusterReadyTimeoutMs) throws IOException {
- return this.registry.getInstances("LLAP", clusterReadyTimeoutMs);
+ return (LlapServiceInstanceSet) this.registry.getInstances("LLAP", clusterReadyTimeoutMs);
}
public void registerStateChangeListener(
http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
index 8339230..f5d6202 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
@@ -53,7 +53,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class LlapZookeeperRegistryImpl
- extends ZkRegistryBase<LlapServiceInstance> implements ServiceRegistry {
+ extends ZkRegistryBase<LlapServiceInstance> implements ServiceRegistry<LlapServiceInstance> {
private static final Logger LOG = LoggerFactory.getLogger(LlapZookeeperRegistryImpl.class);
/**
@@ -65,8 +65,6 @@ public class LlapZookeeperRegistryImpl
private static final String IPC_LLAP = "llap";
private static final String IPC_OUTPUTFORMAT = "llapoutputformat";
private final static String NAMESPACE_PREFIX = "llap-";
- private final static String USER_SCOPE_PATH_PREFIX = "user-";
- private static final String WORKER_PREFIX = "worker-";
private static final String SLOT_PREFIX = "slot-";
private static final String SASL_LOGIN_CONTEXT_NAME = "LlapZooKeeperClient";
@@ -79,7 +77,7 @@ public class LlapZookeeperRegistryImpl
public LlapZookeeperRegistryImpl(String instanceName, Configuration conf) {
super(instanceName, conf,
HiveConf.getVar(conf, ConfVars.LLAP_ZK_REGISTRY_NAMESPACE), NAMESPACE_PREFIX,
- USER_SCOPE_PATH_PREFIX, WORKER_PREFIX,
+ USER_SCOPE_PATH_PREFIX, WORKER_PREFIX, WORKER_GROUP,
LlapProxy.isDaemon() ? SASL_LOGIN_CONTEXT_NAME : null,
HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_PRINCIPAL),
HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_KEYTAB_FILE),
@@ -225,7 +223,7 @@ public class LlapZookeeperRegistryImpl
@Override
public String toString() {
- return "DynamicServiceInstance [id=" + getWorkerIdentity() + ", host=" + host + ":" + rpcPort +
+ return "DynamicServiceInstance [id=" + getWorkerIdentity() + ", host=" + getHost() + ":" + getRpcPort() +
" with resources=" + getResource() + ", shufflePort=" + getShufflePort() +
", servicesAddress=" + getServicesAddress() + ", mgmtPort=" + getManagementPort() + "]";
}
@@ -327,9 +325,9 @@ public class LlapZookeeperRegistryImpl
if (data == null) continue;
String nodeName = extractNodeName(childData);
if (nodeName.startsWith(WORKER_PREFIX)) {
- Set<LlapServiceInstance> instances = getInstancesByPath(childData.getPath());
+ LlapServiceInstance instances = getInstanceByPath(childData.getPath());
if (instances != null) {
- unsorted.addAll(instances);
+ unsorted.add(instances);
}
} else if (nodeName.startsWith(SLOT_PREFIX)) {
slotByWorker.put(extractWorkerIdFromSlot(childData),
http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClient.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClient.java b/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClient.java
index 32d5caa..3208e21 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClient.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClient.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
import org.apache.hadoop.hive.llap.registry.LlapServiceInstance;
import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet;
import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
+import org.apache.hadoop.hive.registry.ServiceInstanceSet;
import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
@@ -55,7 +56,7 @@ public class LlapTokenClient {
private final SocketFactory socketFactory;
private final RetryPolicy retryPolicy;
private final Configuration conf;
- private LlapServiceInstanceSet activeInstances;
+ private ServiceInstanceSet<LlapServiceInstance> activeInstances;
private Collection<LlapServiceInstance> lastKnownInstances;
private LlapManagementProtocolClientImpl client;
private LlapServiceInstance clientInstance;
http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/llap-client/src/java/org/apache/hadoop/hive/registry/RegistryUtilities.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/RegistryUtilities.java b/llap-client/src/java/org/apache/hadoop/hive/registry/RegistryUtilities.java
new file mode 100644
index 0000000..e069e43
--- /dev/null
+++ b/llap-client/src/java/org/apache/hadoop/hive/registry/RegistryUtilities.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.registry;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.UUID;
+
+public class RegistryUtilities {
+ private static final String LOCALHOST = "localhost";
+
+ /**
+ * Will return hostname stored in InetAddress.
+ *
+ * @return hostname
+ */
+ public static String getHostName() {
+ try {
+ return InetAddress.getLocalHost().getHostName();
+ } catch (UnknownHostException e) {
+ return LOCALHOST;
+ }
+ }
+
+ /**
+ * Will return FQDN of the host after doing reverse DNS lookip.
+ *
+ * @return FQDN of host
+ */
+ public static String getCanonicalHostName() {
+ try {
+ return InetAddress.getLocalHost().getCanonicalHostName();
+ } catch (UnknownHostException e) {
+ return LOCALHOST;
+ }
+ }
+
+ public static String getUUID() {
+ return String.valueOf(UUID.randomUUID());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstance.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstance.java b/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstance.java
index 908b3bb..4493e99 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstance.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstance.java
@@ -21,27 +21,27 @@ public interface ServiceInstance {
* Worker identity is a UUID (unique across restarts), to identify a node which died & was brought
* back on the same host/port
*/
- public abstract String getWorkerIdentity();
+ String getWorkerIdentity();
/**
* Hostname of the service instance
*
- * @return
+ * @return service hostname
*/
- public abstract String getHost();
+ String getHost();
/**
* RPC Endpoint for service instance
- *
- * @return
+ *
+ * @return rpc port
*/
- public int getRpcPort();
+ int getRpcPort();
/**
* Config properties of the Service Instance (llap.daemon.*)
- *
- * @return
+ *
+ * @return properties
*/
- public abstract Map<String, String> getProperties();
+ Map<String, String> getProperties();
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstanceSet.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstanceSet.java b/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstanceSet.java
index 34fba5c..63178cc 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstanceSet.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstanceSet.java
@@ -29,15 +29,15 @@ public interface ServiceInstanceSet<InstanceType extends ServiceInstance> {
* The worker identity does not collide between restarts, so each restart will have a unique id,
* while having the same host/ip pair.
*
- * @return
+ * @return instance list
*/
Collection<InstanceType> getAll();
/**
* Get an instance by worker identity.
*
- * @param name
- * @return
+ * @param name worker id
+ * @return instance
*/
InstanceType getInstance(String name);
@@ -46,13 +46,13 @@ public interface ServiceInstanceSet<InstanceType extends ServiceInstance> {
*
* The list could include dead and alive instances.
*
- * @param host
- * @return
+ * @param host hostname
+ * @return instance list
*/
Set<InstanceType> getByHost(String host);
/**
- * Get number of instances in the currently availabe.
+ * Get number of instances in the currently available.
*
* @return - number of instances
*/
http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ServiceInstanceBase.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ServiceInstanceBase.java b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ServiceInstanceBase.java
index db3d788..de8910c 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ServiceInstanceBase.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ServiceInstanceBase.java
@@ -15,6 +15,8 @@ package org.apache.hadoop.hive.registry.impl;
import java.io.IOException;
import java.util.Map;
+import java.util.Objects;
+
import org.apache.hadoop.hive.registry.ServiceInstance;
import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
import org.apache.hadoop.registry.client.types.AddressTypes;
@@ -25,26 +27,27 @@ import org.slf4j.LoggerFactory;
public class ServiceInstanceBase implements ServiceInstance {
private static final Logger LOG = LoggerFactory.getLogger(ServiceInstanceBase.class);
+ private String host;
+ private int rpcPort;
+ private String workerIdentity;
+ private Map<String, String> properties;
- protected final ServiceRecord srv;
- protected final String host;
- protected final int rpcPort;
+ // empty c'tor to make jackson happy
+ public ServiceInstanceBase() {
- public ServiceInstanceBase(ServiceRecord srv, String rpcName) throws IOException {
- this.srv = srv;
+ }
+ public ServiceInstanceBase(ServiceRecord srv, String rpcName) throws IOException {
if (LOG.isTraceEnabled()) {
LOG.trace("Working with ServiceRecord: {}", srv);
}
-
final Endpoint rpc = srv.getInternalEndpoint(rpcName);
-
- this.host =
- RegistryTypeUtils.getAddressField(rpc.addresses.get(0),
- AddressTypes.ADDRESS_HOSTNAME_FIELD);
- this.rpcPort =
- Integer.parseInt(RegistryTypeUtils.getAddressField(rpc.addresses.get(0),
- AddressTypes.ADDRESS_PORT_FIELD));
+ this.host = RegistryTypeUtils.getAddressField(rpc.addresses.get(0),
+ AddressTypes.ADDRESS_HOSTNAME_FIELD);
+ this.rpcPort = Integer.parseInt(RegistryTypeUtils.getAddressField(rpc.addresses.get(0),
+ AddressTypes.ADDRESS_PORT_FIELD));
+ this.workerIdentity = srv.get(ZkRegistryBase.UNIQUE_IDENTIFIER);
+ this.properties = srv.attributes();
}
@Override
@@ -57,17 +60,19 @@ public class ServiceInstanceBase implements ServiceInstance {
}
ServiceInstanceBase other = (ServiceInstanceBase) o;
- return this.getWorkerIdentity().equals(other.getWorkerIdentity());
+ return Objects.equals(getWorkerIdentity(), other.getWorkerIdentity())
+ && Objects.equals(host, other.host)
+ && Objects.equals(rpcPort, other.rpcPort);
}
@Override
public int hashCode() {
- return getWorkerIdentity().hashCode();
+ return getWorkerIdentity().hashCode() + (31 * host.hashCode()) + (31 * rpcPort);
}
@Override
public String getWorkerIdentity() {
- return srv.get(ZkRegistryBase.UNIQUE_IDENTIFIER);
+ return workerIdentity;
}
@Override
@@ -82,12 +87,28 @@ public class ServiceInstanceBase implements ServiceInstance {
@Override
public Map<String, String> getProperties() {
- return srv.attributes();
+ return properties;
+ }
+
+ public void setHost(final String host) {
+ this.host = host;
+ }
+
+ public void setRpcPort(final int rpcPort) {
+ this.rpcPort = rpcPort;
+ }
+
+ public void setWorkerIdentity(final String workerIdentity) {
+ this.workerIdentity = workerIdentity;
+ }
+
+ public void setProperties(final Map<String, String> properties) {
+ this.properties = properties;
}
@Override
public String toString() {
return "DynamicServiceInstance [id=" + getWorkerIdentity() + ", host="
- + host + ":" + rpcPort + "]";
+ + host + ":" + rpcPort + "]";
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java
index 0724cf5..d09cb24 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java
@@ -13,29 +13,26 @@
*/
package org.apache.hadoop.hive.registry.impl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.io.IOException;
import org.apache.commons.codec.binary.Base64;
-
-import com.google.common.io.ByteStreams;
-
-import org.apache.tez.common.security.JobTokenIdentifier;
-
-import org.apache.hadoop.security.token.Token;
-
-import java.io.IOException;
import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
import org.apache.hadoop.registry.client.types.AddressTypes;
import org.apache.hadoop.registry.client.types.Endpoint;
import org.apache.hadoop.registry.client.types.ServiceRecord;
+import org.apache.hadoop.security.token.Token;
+import org.apache.tez.common.security.JobTokenIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.io.ByteStreams;
public class TezAmInstance extends ServiceInstanceBase {
private static final Logger LOG = LoggerFactory.getLogger(TezAmInstance.class);
private final int pluginPort;
private Token<JobTokenIdentifier> token;
- public TezAmInstance(ServiceRecord srv) throws IOException {
+ TezAmInstance(ServiceRecord srv) throws IOException {
super(srv, TezAmRegistryImpl.IPC_TEZCLIENT);
final Endpoint plugin = srv.getInternalEndpoint(TezAmRegistryImpl.IPC_PLUGIN);
if (plugin != null) {
@@ -76,7 +73,7 @@ public class TezAmInstance extends ServiceInstanceBase {
@Override
public String toString() {
- return "TezAmInstance [" + getSessionId() + ", host=" + host + ", rpcPort=" + rpcPort +
+ return "TezAmInstance [" + getSessionId() + ", host=" + getHost() + ", rpcPort=" + getRpcPort() +
", pluginPort=" + pluginPort + ", token=" + token + "]";
}
http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmRegistryImpl.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmRegistryImpl.java
index 417e571..ab02cf4 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmRegistryImpl.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmRegistryImpl.java
@@ -37,21 +37,19 @@ public class TezAmRegistryImpl extends ZkRegistryBase<TezAmInstance> {
static final String AM_SESSION_ID = "am.session.id", AM_PLUGIN_TOKEN = "am.plugin.token",
AM_PLUGIN_JOBID = "am.plugin.jobid";
private final static String NAMESPACE_PREFIX = "tez-am-";
- private final static String USER_SCOPE_PATH_PREFIX = "user-";
- private static final String WORKER_PREFIX = "worker-";
private static final String SASL_LOGIN_CONTEXT_NAME = "TezAmZooKeeperClient";
private final String registryName;
- public static TezAmRegistryImpl create(Configuration conf, boolean b) {
+ public static TezAmRegistryImpl create(Configuration conf, boolean useSecureZk) {
String amRegistryName = HiveConf.getVar(conf, ConfVars.LLAP_TASK_SCHEDULER_AM_REGISTRY_NAME);
return StringUtils.isBlank(amRegistryName) ? null
- : new TezAmRegistryImpl(amRegistryName, conf, true);
+ : new TezAmRegistryImpl(amRegistryName, conf, useSecureZk);
}
private TezAmRegistryImpl(String instanceName, Configuration conf, boolean useSecureZk) {
- super(instanceName, conf, null, NAMESPACE_PREFIX, USER_SCOPE_PATH_PREFIX, WORKER_PREFIX,
+ super(instanceName, conf, null, NAMESPACE_PREFIX, USER_SCOPE_PATH_PREFIX, WORKER_PREFIX, WORKER_GROUP,
useSecureZk ? SASL_LOGIN_CONTEXT_NAME : null,
HiveConf.getVar(conf, ConfVars.LLAP_TASK_SCHEDULER_AM_REGISTRY_PRINCIPAL),
HiveConf.getVar(conf, ConfVars.LLAP_TASK_SCHEDULER_AM_REGISTRY_KEYTAB_FILE),
http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java
index 17269dd..e7227a8 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java
@@ -13,13 +13,7 @@
*/
package org.apache.hadoop.hive.registry.impl;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
@@ -34,6 +28,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLProvider;
@@ -44,12 +39,15 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode;
import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode.Mode;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.LlapUtil;
+import org.apache.hadoop.hive.registry.RegistryUtilities;
import org.apache.hadoop.hive.registry.ServiceInstance;
import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener;
import org.apache.hadoop.registry.client.binding.RegistryUtils;
@@ -65,6 +63,12 @@ import org.apache.zookeeper.data.Id;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
/**
* This is currently used for implementation inheritance only; it doesn't provide a unified flow
* into which one can just plug a few abstract method implementations, because providing one with
@@ -77,16 +81,18 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
private static final Logger LOG = LoggerFactory.getLogger(ZkRegistryBase.class);
private final static String SASL_NAMESPACE = "sasl";
private final static String UNSECURE_NAMESPACE = "unsecure";
-
- static final String UNIQUE_IDENTIFIER = "registry.unique.id";
- private static final UUID uniq = UUID.randomUUID();
+ protected final static String USER_SCOPE_PATH_PREFIX = "user-";
+ protected static final String WORKER_PREFIX = "worker-";
+ protected static final String WORKER_GROUP = "workers";
+ public static final String UNIQUE_IDENTIFIER = "registry.unique.id";
+ protected static final UUID UNIQUE_ID = UUID.randomUUID();
+ private static final Joiner PATH_JOINER = Joiner.on("/").skipNulls();
protected final Configuration conf;
protected final CuratorFramework zooKeeperClient;
- // userPathPrefix is the path specific to the user for which ACLs should be restrictive.
// workersPath is the directory path where all the worker znodes are located.
protected final String workersPath;
- private final String userPathPrefix, workerNodePrefix;
+ private final String workerNodePrefix;
protected final ServiceRecordMarshal encoder; // to marshal/unmarshal znode data
@@ -99,7 +105,9 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
private final String disableMessage;
private final Lock instanceCacheLock = new ReentrantLock();
- private final Map<String, Set<InstanceType>> pathToInstanceCache;
+ // there can be only one instance per path
+ private final Map<String, InstanceType> pathToInstanceCache;
+ // there can be multiple instances per node
private final Map<String, Set<InstanceType>> nodeToInstanceCache;
// The registration znode.
@@ -109,29 +117,22 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
private PathChildrenCache instancesCache; // Created on demand.
/** Local hostname. */
- protected static final String hostname;
- static {
- String localhost = "localhost";
- try {
- localhost = InetAddress.getLocalHost().getCanonicalHostName();
- } catch (UnknownHostException uhe) {
- // ignore
- }
- hostname = localhost;
- }
+ protected static final String hostname = RegistryUtilities.getCanonicalHostName();
/**
* @param rootNs A single root namespace override. Not recommended.
- * @param nsPrefix The namespace prefix to use with default namespaces.
+ * @param nsPrefix The namespace prefix to use with default namespaces (appends 'sasl' for secure else 'unsecure'
+ * to namespace prefix to get effective root namespace).
* @param userScopePathPrefix The prefix to use for the user-specific part of the path.
* @param workerPrefix The prefix to use for each worker znode.
+ * @param workerGroup group name to use for all workers
* @param zkSaslLoginContextName SASL login context name for ZK security; null if not needed.
* @param zkPrincipal ZK security principal.
* @param zkKeytab ZK security keytab.
* @param aclsConfig A config setting to use to determine if ACLs should be verified.
*/
public ZkRegistryBase(String instanceName, Configuration conf, String rootNs, String nsPrefix,
- String userScopePathPrefix, String workerPrefix,
+ String userScopePathPrefix, String workerPrefix, String workerGroup,
String zkSaslLoginContextName, String zkPrincipal, String zkKeytab, ConfVars aclsConfig) {
this.conf = new Configuration(conf);
this.saslLoginContextName = zkSaslLoginContextName;
@@ -145,29 +146,52 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
this.disableMessage = "";
}
this.conf.addResource(YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);
- String zkEnsemble = getQuorumServers(this.conf);
this.encoder = new RegistryUtils.ServiceRecordMarshal();
- int sessionTimeout = (int) HiveConf.getTimeVar(conf,
- ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT, TimeUnit.MILLISECONDS);
- int baseSleepTime = (int) HiveConf.getTimeVar(conf,
- ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME, TimeUnit.MILLISECONDS);
- int maxRetries = HiveConf.getIntVar(conf, ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES);
// sample path: /llap-sasl/hiveuser/hostname/workers/worker-0000000
// worker-0000000 is the sequence number which will be retained until session timeout. If a
// worker does not respond due to communication interruptions it will retain the same sequence
// number when it returns back. If session timeout expires, the node will be deleted and new
// addition of the same node (restart) will get next sequence number
- this.userPathPrefix = userScopePathPrefix + getZkPathUser(this.conf);
- this.workerNodePrefix = workerPrefix;
- this.workersPath = "/" + userPathPrefix + "/" + instanceName + "/workers";
+ final String userPathPrefix = userScopePathPrefix == null ? null : userScopePathPrefix + getZkPathUser(conf);
+ this.workerNodePrefix = workerPrefix == null ? WORKER_PREFIX : workerPrefix;
+ this.workersPath = "/" + PATH_JOINER.join(userPathPrefix, instanceName, workerGroup);
this.instancesCache = null;
this.stateChangeListeners = new HashSet<>();
this.pathToInstanceCache = new ConcurrentHashMap<>();
this.nodeToInstanceCache = new ConcurrentHashMap<>();
+ final String namespace = getRootNamespace(rootNs, nsPrefix);
+ ACLProvider aclProvider;
+ // get acl provider for most outer path that is non-null
+ if (userPathPrefix == null) {
+ if (instanceName == null) {
+ if (workerGroup == null) {
+ aclProvider = getACLProviderForZKPath(namespace);
+ } else {
+ aclProvider = getACLProviderForZKPath(workerGroup);
+ }
+ } else {
+ aclProvider = getACLProviderForZKPath(instanceName);
+ }
+ } else {
+ aclProvider = getACLProviderForZKPath(userScopePathPrefix);
+ }
+ this.zooKeeperClient = getZookeeperClient(conf, namespace, aclProvider);
+ this.zooKeeperClient.getConnectionStateListenable().addListener(new ZkConnectionStateListener());
+ }
+
+ public static String getRootNamespace(String userProvidedNamespace, String defaultNamespacePrefix) {
+ final boolean isSecure = UserGroupInformation.isSecurityEnabled();
+ String rootNs = userProvidedNamespace;
+ if (rootNs == null) {
+ rootNs = defaultNamespacePrefix + (isSecure ? SASL_NAMESPACE : UNSECURE_NAMESPACE);
+ }
+ return rootNs;
+ }
+ private ACLProvider getACLProviderForZKPath(String zkPath) {
final boolean isSecure = UserGroupInformation.isSecurityEnabled();
- ACLProvider zooKeeperAclProvider = new ACLProvider() {
+ return new ACLProvider() {
@Override
public List<ACL> getDefaultAcl() {
// We always return something from getAclForPath so this should not happen.
@@ -177,31 +201,40 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
@Override
public List<ACL> getAclForPath(String path) {
- if (!isSecure || path == null || !path.contains(userPathPrefix)) {
+ if (!isSecure || path == null || !path.contains(zkPath)) {
// No security or the path is below the user path - full access.
return Lists.newArrayList(ZooDefs.Ids.OPEN_ACL_UNSAFE);
}
return createSecureAcls();
}
};
- if (rootNs == null) {
- rootNs = nsPrefix + (isSecure ? SASL_NAMESPACE : UNSECURE_NAMESPACE); // The normal path.
- }
+ }
+
+ private CuratorFramework getZookeeperClient(Configuration conf, String namespace, ACLProvider zooKeeperAclProvider) {
+ String zkEnsemble = getQuorumServers(conf);
+ int sessionTimeout = (int) HiveConf.getTimeVar(conf,
+ ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT, TimeUnit.MILLISECONDS);
+ int connectionTimeout = (int) HiveConf.getTimeVar(conf,
+ ConfVars.HIVE_ZOOKEEPER_CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);
+ int baseSleepTime = (int) HiveConf.getTimeVar(conf,
+ ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME, TimeUnit.MILLISECONDS);
+ int maxRetries = HiveConf.getIntVar(conf, ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES);
// Create a CuratorFramework instance to be used as the ZooKeeper client
// Use the zooKeeperAclProvider to create appropriate ACLs
- this.zooKeeperClient = CuratorFrameworkFactory.builder()
- .connectString(zkEnsemble)
- .sessionTimeoutMs(sessionTimeout)
- .aclProvider(zooKeeperAclProvider)
- .namespace(rootNs)
- .retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries))
- .build();
+ return CuratorFrameworkFactory.builder()
+ .connectString(zkEnsemble)
+ .sessionTimeoutMs(sessionTimeout)
+ .connectionTimeoutMs(connectionTimeout)
+ .aclProvider(zooKeeperAclProvider)
+ .namespace(namespace)
+ .retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries))
+ .build();
}
private static List<ACL> createSecureAcls() {
// Read all to the world
- List<ACL> nodeAcls = new ArrayList<ACL>(ZooDefs.Ids.READ_ACL_UNSAFE);
+ List<ACL> nodeAcls = new ArrayList<>(ZooDefs.Ids.READ_ACL_UNSAFE);
// Create/Delete/Write/Admin to creator
nodeAcls.addAll(ZooDefs.Ids.CREATOR_ALL_ACL);
return nodeAcls;
@@ -211,9 +244,9 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
* Get the ensemble server addresses from the configuration. The format is: host1:port,
* host2:port..
*
- * @param conf
+ * @param conf configuration
**/
- private String getQuorumServers(Configuration conf) {
+ private static String getQuorumServers(Configuration conf) {
String[] hosts = conf.getTrimmedStrings(ConfVars.HIVE_ZOOKEEPER_QUORUM.varname);
String port = conf.get(ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT.varname,
ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT.getDefaultValue());
@@ -238,7 +271,7 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
protected final String registerServiceRecord(ServiceRecord srv) throws IOException {
// restart sensitive instance id
- srv.set(UNIQUE_IDENTIFIER, uniq.toString());
+ srv.set(UNIQUE_IDENTIFIER, UNIQUE_ID.toString());
// Create a znode under the rootNamespace parent for this instance of the server
try {
@@ -275,11 +308,28 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
CloseableUtils.closeQuietly(znode);
throw (e instanceof IOException) ? (IOException)e : new IOException(e);
}
- return uniq.toString();
+ return UNIQUE_ID.toString();
}
+ protected final void updateServiceRecord(ServiceRecord srv) throws IOException {
+ try {
+ znode.setData(encoder.toBytes(srv));
+
+ if (doCheckAcls) {
+ try {
+ checkAndSetAcls();
+ } catch (Exception ex) {
+ throw new IOException("Error validating or setting ACLs. " + disableMessage, ex);
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("Unable to update znode with new service record", e);
+ CloseableUtils.closeQuietly(znode);
+ throw (e instanceof IOException) ? (IOException) e : new IOException(e);
+ }
+ }
- protected final void initializeWithoutRegisteringInternal() throws IOException {
+ final void initializeWithoutRegisteringInternal() throws IOException {
// Create a znode under the rootNamespace parent for this instance of the server
try {
try {
@@ -345,8 +395,8 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
private void addToCache(String path, String host, InstanceType instance) {
instanceCacheLock.lock();
try {
- putInCache(path, pathToInstanceCache, instance);
- putInCache(host, nodeToInstanceCache, instance);
+ putInInstanceCache(path, pathToInstanceCache, instance);
+ putInNodeCache(host, nodeToInstanceCache, instance);
} finally {
instanceCacheLock.unlock();
}
@@ -368,14 +418,19 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
path, host, pathToInstanceCache.size(), nodeToInstanceCache.size());
}
- private void putInCache(String key, Map<String, Set<InstanceType>> cache,
+ private void putInInstanceCache(String key, Map<String, InstanceType> cache,
InstanceType instance) {
+ cache.put(key, instance);
+ }
+
+ private void putInNodeCache(String key, Map<String, Set<InstanceType>> cache,
+ InstanceType instance) {
Set<InstanceType> instanceSet = cache.get(key);
if (instanceSet == null) {
- instanceSet = Sets.newHashSet();
- cache.put(key, instanceSet);
+ instanceSet = new HashSet<>();
+ instanceSet.add(instance);
}
- instanceSet.add(instance);
+ cache.put(key, instanceSet);
}
protected final void populateCache(PathChildrenCache instancesCache, boolean doInvokeListeners) {
@@ -403,7 +458,7 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
protected abstract InstanceType createServiceInstance(ServiceRecord srv) throws IOException;
- protected static final byte[] getWorkerData(ChildData childData, String workerNodePrefix) {
+ protected static byte[] getWorkerData(ChildData childData, String workerNodePrefix) {
if (childData == null) return null;
byte[] data = childData.getData();
if (data == null) return null;
@@ -415,8 +470,7 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
private final Logger LOG = LoggerFactory.getLogger(InstanceStateChangeListener.class);
@Override
- public void childEvent(final CuratorFramework client, final PathChildrenCacheEvent event)
- throws Exception {
+ public void childEvent(final CuratorFramework client, final PathChildrenCacheEvent event) {
Preconditions.checkArgument(client != null
&& client.getState() == CuratorFrameworkState.STARTED, "client is not started");
@@ -427,28 +481,32 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
if (!nodeName.startsWith(workerNodePrefix)) return;
LOG.info("{} for zknode {}", event.getType(), childData.getPath());
InstanceType instance = extractServiceInstance(event, childData);
- int ephSeqVersion = extractSeqNum(nodeName);
- switch (event.getType()) {
- case CHILD_ADDED:
- addToCache(childData.getPath(), instance.getHost(), instance);
- for (ServiceInstanceStateChangeListener<InstanceType> listener : stateChangeListeners) {
- listener.onCreate(instance, ephSeqVersion);
- }
- break;
- case CHILD_UPDATED:
- addToCache(childData.getPath(), instance.getHost(), instance);
- for (ServiceInstanceStateChangeListener<InstanceType> listener : stateChangeListeners) {
- listener.onUpdate(instance, ephSeqVersion);
- }
- break;
- case CHILD_REMOVED:
- removeFromCache(childData.getPath(), instance.getHost());
- for (ServiceInstanceStateChangeListener<InstanceType> listener : stateChangeListeners) {
- listener.onRemove(instance, ephSeqVersion);
+ if (instance != null) {
+ int ephSeqVersion = extractSeqNum(nodeName);
+ switch (event.getType()) {
+ case CHILD_ADDED:
+ addToCache(childData.getPath(), instance.getHost(), instance);
+ for (ServiceInstanceStateChangeListener<InstanceType> listener : stateChangeListeners) {
+ listener.onCreate(instance, ephSeqVersion);
+ }
+ break;
+ case CHILD_UPDATED:
+ addToCache(childData.getPath(), instance.getHost(), instance);
+ for (ServiceInstanceStateChangeListener<InstanceType> listener : stateChangeListeners) {
+ listener.onUpdate(instance, ephSeqVersion);
+ }
+ break;
+ case CHILD_REMOVED:
+ removeFromCache(childData.getPath(), instance.getHost());
+ for (ServiceInstanceStateChangeListener<InstanceType> listener : stateChangeListeners) {
+ listener.onRemove(instance, ephSeqVersion);
+ }
+ break;
+ default:
+ // Ignore all the other events; logged above.
}
- break;
- default:
- // Ignore all the other events; logged above.
+ } else {
+ LOG.info("instance is null for event: {} childData: {}", event.getType(), childData);
}
}
}
@@ -464,7 +522,7 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
protected final Set<InstanceType> getByHostInternal(String host) {
Set<InstanceType> byHost = nodeToInstanceCache.get(host);
- byHost = (byHost == null) ? Sets.<InstanceType>newHashSet() : byHost;
+ byHost = (byHost == null) ? Sets.newHashSet() : byHost;
if (LOG.isDebugEnabled()) {
LOG.debug("Returning " + byHost.size() + " hosts for locality allocation on " + host);
}
@@ -472,11 +530,7 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
}
protected final Collection<InstanceType> getAllInternal() {
- Set<InstanceType> instances = new HashSet<>();
- for(Set<InstanceType> instanceSet : pathToInstanceCache.values()) {
- instances.addAll(instanceSet);
- }
- return instances;
+ return new HashSet<>(pathToInstanceCache.values());
}
private static String extractNodeName(ChildData childData) {
@@ -564,13 +618,17 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
CloseableUtils.class.getName();
}
+ protected void unregisterInternal() {
+ CloseableUtils.closeQuietly(znode);
+ }
+
public void stop() {
CloseableUtils.closeQuietly(znode);
CloseableUtils.closeQuietly(instancesCache);
CloseableUtils.closeQuietly(zooKeeperClient);
}
- protected final Set<InstanceType> getInstancesByPath(String path) {
+ protected final InstanceType getInstanceByPath(String path) {
return pathToInstanceCache.get(path);
}
@@ -588,4 +646,12 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
throw e;
}
}
+
+ // for debugging
+ private class ZkConnectionStateListener implements ConnectionStateListener {
+ @Override
+ public void stateChanged(final CuratorFramework curatorFramework, final ConnectionState connectionState) {
+ LOG.info("Connection state change notification received. State: {}", connectionState);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
----------------------------------------------------------------------
diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
index 0120639..3aec46b 100644
--- a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
+++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet;
import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
import org.apache.hadoop.hive.llap.tez.Converters;
+import org.apache.hadoop.hive.registry.ServiceInstanceSet;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.NullWritable;
@@ -343,7 +344,7 @@ public class LlapBaseInputFormat<V extends WritableComparable<?>>
private LlapServiceInstance getServiceInstanceForHost(LlapRegistryService registryService, String host) throws IOException {
InetAddress address = InetAddress.getByName(host);
- LlapServiceInstanceSet instanceSet = registryService.getInstances();
+ ServiceInstanceSet<LlapServiceInstance> instanceSet = registryService.getInstances();
LlapServiceInstance serviceInstance = null;
// The name used in the service registry may not match the host name we're using.
@@ -375,7 +376,7 @@ public class LlapBaseInputFormat<V extends WritableComparable<?>>
private LlapServiceInstance getServiceInstanceRandom(LlapRegistryService registryService) throws IOException {
- LlapServiceInstanceSet instanceSet = registryService.getInstances();
+ ServiceInstanceSet<LlapServiceInstance> instanceSet = registryService.getInstances();
LlapServiceInstance serviceInstance = null;
LOG.info("Finding random live service instance");
http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java
index 58bf8dc..b944fad 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java
@@ -34,8 +34,10 @@ import org.apache.hadoop.hive.common.classification.InterfaceAudience;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.registry.LlapServiceInstance;
+import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet;
import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
import org.apache.hadoop.hive.llap.registry.impl.LlapZookeeperRegistryImpl;
+import org.apache.hadoop.hive.registry.ServiceInstanceSet;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.service.CompositeService;
@@ -230,7 +232,8 @@ public class LlapWebServices extends AbstractService {
}
jg.writeStringField("identity", registry.getWorkerIdentity());
jg.writeArrayFieldStart("peers");
- for (LlapServiceInstance s : registry.getInstances().getAllInstancesOrdered(false)) {
+ ServiceInstanceSet<LlapServiceInstance> instanceSet = registry.getInstances();
+ for (LlapServiceInstance s : ((LlapServiceInstanceSet) instanceSet).getAllInstancesOrdered(false)) {
jg.writeStartObject();
jg.writeStringField("identity", s.getWorkerIdentity());
jg.writeStringField("host", s.getHost());
http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
index 66de3b8..6ddecca 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
@@ -14,6 +14,16 @@
package org.apache.hadoop.hive.llap.tezplugins;
+import com.google.common.io.ByteArrayDataOutput;
+
+import org.apache.hadoop.hive.registry.ServiceInstanceSet;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+import org.apache.hadoop.hive.registry.impl.TezAmRegistryImpl;
+
+import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
index 46cfe56..a051f90 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
@@ -98,13 +98,18 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger
protected TezSessionPoolManager() {
}
- public void startPool() throws Exception {
+ public void startPool(HiveConf conf, final WMFullResourcePlan resourcePlan) throws Exception {
if (defaultSessionPool != null) {
defaultSessionPool.start();
}
if (expirationTracker != null) {
expirationTracker.start();
}
+ initTriggers(conf);
+ if (resourcePlan != null) {
+ updateTriggers(resourcePlan);
+ LOG.info("Updated tez session pool manager with active resource plan: {}", resourcePlan.getPlan().getName());
+ }
}
public void setupPool(HiveConf conf) throws Exception {
@@ -157,8 +162,6 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger
numConcurrentLlapQueries = conf.getIntVar(ConfVars.HIVE_SERVER2_LLAP_CONCURRENT_QUERIES);
llapQueue = new Semaphore(numConcurrentLlapQueries, true);
- initTriggers(conf);
-
String queueAllowedStr = HiveConf.getVar(initConf,
ConfVars.HIVE_SERVER2_TEZ_SESSION_CUSTOM_QUEUE_ALLOWED);
try {
http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java
index d1b3fec..d3748ed 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java
@@ -18,25 +18,20 @@
package org.apache.hadoop.hive.ql.exec.tez;
-import com.google.common.util.concurrent.SettableFuture;
-import org.apache.hadoop.hive.registry.impl.TezAmInstance;
-import org.apache.hadoop.conf.Configuration;
import java.io.IOException;
import java.net.URISyntaxException;
-import java.util.Collection;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+
import javax.security.auth.login.LoginException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
+
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
-import org.apache.hadoop.hive.ql.wm.SessionTriggerProvider;
-import org.apache.hadoop.hive.ql.wm.TriggerActionHandler;
import org.apache.hadoop.hive.registry.impl.TezAmInstance;
import org.apache.tez.dag.api.TezException;
+
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
index b98fb58..046ea19 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
@@ -738,7 +738,10 @@ public class TezSessionState {
private Path createTezDir(String sessionId, String suffix) throws IOException {
// tez needs its own scratch dir (per session)
// TODO: De-link from SessionState. A TezSession can be linked to different Hive Sessions via the pool.
- Path tezDir = new Path(SessionState.get().getHdfsScratchDirURIString(), TEZ_DIR);
+ SessionState sessionState = SessionState.get();
+ String hdfsScratchDir = sessionState == null ? HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIR) : sessionState
+ .getHdfsScratchDirURIString();
+ Path tezDir = new Path(hdfsScratchDir, TEZ_DIR);
tezDir = new Path(tezDir, sessionId + ((suffix == null) ? "" : ("-" + suffix)));
FileSystem fs = tezDir.getFileSystem(conf);
FsPermission fsPermission = new FsPermission(HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIRPERMISSION));
http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java
index bc438bb..1b7321b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java
@@ -26,6 +26,7 @@ import org.apache.commons.lang.ArrayUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.llap.registry.LlapServiceInstance;
+import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet;
import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.split.SplitLocationProvider;
@@ -52,7 +53,7 @@ public class Utils {
LOG.info("Using LLAP instance " + serviceRegistry.getApplicationId());
Collection<LlapServiceInstance> serviceInstances =
- serviceRegistry.getInstances().getAllInstancesOrdered(true);
+ serviceRegistry.getInstances().getAllInstancesOrdered(true);
Preconditions.checkArgument(!serviceInstances.isEmpty(),
"No running LLAP daemons! Please check LLAP service status and zookeeper configuration");
ArrayList<String> locations = new ArrayList<>(serviceInstances.size());
http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java
index a8d729d..0d1990a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hive.llap.registry.LlapServiceInstance;
import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet;
import org.apache.hadoop.hive.llap.registry.impl.InactiveServiceInstance;
import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
+import org.apache.hadoop.hive.registry.ServiceInstanceSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -112,7 +113,7 @@ public class LlapClusterStateForCompile {
return false; // Don't fail; this is best-effort.
}
}
- LlapServiceInstanceSet instances;
+ ServiceInstanceSet<LlapServiceInstance> instances;
try {
instances = svc.getInstances(10);
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
index d261623..d5b683f 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
@@ -18,11 +18,14 @@
package org.apache.hadoop.hive.ql.exec.tez;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.Random;
+
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.junit.Before;
@@ -90,7 +93,7 @@ public class TestTezSessionPool {
poolManager = new TestTezSessionPoolManager();
poolManager.setupPool(conf);
- poolManager.startPool();
+ poolManager.startPool(conf, null);
// this is now a LIFO operation
// draw 1 and replace
@@ -153,7 +156,7 @@ public class TestTezSessionPool {
poolManager = new TestTezSessionPoolManager();
poolManager.setupPool(conf);
- poolManager.startPool();
+ poolManager.startPool(conf, null);
TezSessionState[] sessions = new TezSessionState[12];
int[] queueCounts = new int[3];
for (int i = 0; i < sessions.length; ++i) {
@@ -234,7 +237,7 @@ public class TestTezSessionPool {
conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LLAP_CONCURRENT_QUERIES, 2);
poolManager = new TestTezSessionPoolManager();
poolManager.setupPool(conf);
- poolManager.startPool();
+ poolManager.startPool(conf, null);
} catch (Exception e) {
LOG.error("Initialization error", e);
fail();
@@ -295,7 +298,7 @@ public class TestTezSessionPool {
try {
poolManager = new TestTezSessionPoolManager();
poolManager.setupPool(conf);
- poolManager.startPool();
+ poolManager.startPool(conf, null);
} catch (Exception e) {
e.printStackTrace();
fail();
http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/service/pom.xml
----------------------------------------------------------------------
diff --git a/service/pom.xml b/service/pom.xml
index 9ad7555..e3774df 100644
--- a/service/pom.xml
+++ b/service/pom.xml
@@ -264,7 +264,48 @@
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
-
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-api</artifactId>
+ <version>${tez.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-runtime-library</artifactId>
+ <version>${tez.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commmons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-dag</artifactId>
+ <version>${tez.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commmons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-server-web-proxy</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>