You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/09/02 18:22:11 UTC
hive git commit: HIVE-14589 : add consistent node replacement to LLAP
for splits (Sergey Shelukhin, reviewed by Siddharth Seth)
Repository: hive
Updated Branches:
refs/heads/master fa1d8c760 -> 9365bec58
HIVE-14589 : add consistent node replacement to LLAP for splits (Sergey Shelukhin, reviewed by Siddharth Seth)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/9365bec5
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/9365bec5
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/9365bec5
Branch: refs/heads/master
Commit: 9365bec584b5a9fbe30f7747b527ca7353dd878e
Parents: fa1d8c7
Author: Sergey Shelukhin <se...@apache.org>
Authored: Fri Sep 2 11:20:45 2016 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Fri Sep 2 11:21:32 2016 -0700
----------------------------------------------------------------------
llap-client/pom.xml | 22 ++
.../hive/llap/registry/ServiceInstanceSet.java | 7 +-
.../registry/impl/LlapFixedRegistryImpl.java | 5 +-
.../impl/LlapZookeeperRegistryImpl.java | 220 +++++++-----
.../hive/llap/registry/impl/SlotZnode.java | 334 +++++++++++++++++++
.../hive/llap/security/LlapTokenClient.java | 4 +-
.../hive/llap/registry/impl/TestSlotZnode.java | 269 +++++++++++++++
.../hive/llap/cli/LlapStatusServiceDriver.java | 21 +-
.../tezplugins/LlapTaskSchedulerService.java | 15 +-
.../apache/hadoop/hive/ql/exec/tez/Utils.java | 4 +-
.../TestHostAffinitySplitLocationProvider.java | 4 +-
11 files changed, 790 insertions(+), 115 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/9365bec5/llap-client/pom.xml
----------------------------------------------------------------------
diff --git a/llap-client/pom.xml b/llap-client/pom.xml
index 0243340..3bacd2b 100644
--- a/llap-client/pom.xml
+++ b/llap-client/pom.xml
@@ -46,6 +46,28 @@
</dependency>
<!-- inter-project -->
<dependency>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ <version>${zookeeper.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-framework</artifactId>
+ <version>${curator.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>apache-curator</artifactId>
+ <version>${curator.version}</version>
+ <type>pom</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-test</artifactId>
+ <version>${curator.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>${commons-lang3.version}</version>
http://git-wip-us.apache.org/repos/asf/hive/blob/9365bec5/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java
index 99ead9b..13b668d 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java
@@ -13,9 +13,8 @@
*/
package org.apache.hadoop.hive.llap.registry;
-import java.io.IOException;
+import java.util.Collection;
import java.util.List;
-import java.util.Map;
import java.util.Set;
public interface ServiceInstanceSet {
@@ -28,14 +27,14 @@ public interface ServiceInstanceSet {
*
* @return
*/
- public Map<String, ServiceInstance> getAll();
+ public Collection<ServiceInstance> getAll();
/**
* Gets a list containing all the instances. This list has the same iteration order across
* different processes, assuming the list of registry entries is the same.
* @return
*/
- public List<ServiceInstance> getAllInstancesOrdered();
+ public Collection<ServiceInstance> getAllInstancesOrdered();
/**
* Get an instance by worker identity.
http://git-wip-us.apache.org/repos/asf/hive/blob/9365bec5/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
index e9456f2..de4d7f2 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
@@ -19,6 +19,7 @@ import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.UnknownHostException;
+import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
@@ -222,8 +223,8 @@ public class LlapFixedRegistryImpl implements ServiceRegistry {
}
@Override
- public Map<String, ServiceInstance> getAll() {
- return instances;
+ public Collection<ServiceInstance> getAll() {
+ return instances.values();
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/9365bec5/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
index 64d2617..5e17ebf 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
@@ -21,15 +21,18 @@ import java.net.URISyntaxException;
import java.net.URL;
import java.net.UnknownHostException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@@ -45,6 +48,7 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode;
+import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode.Mode;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils;
import org.apache.hadoop.conf.Configuration;
@@ -97,13 +101,19 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
private final static String USER_SCOPE_PATH_PREFIX = "user-";
private static final String DISABLE_MESSAGE =
"Set " + ConfVars.LLAP_VALIDATE_ACLS.varname + " to false to disable ACL validation";
+ private static final String WORKER_PREFIX = "worker-";
+ private static final String SLOT_PREFIX = "slot-";
private final Configuration conf;
private final CuratorFramework zooKeeperClient;
- private final String pathPrefix, userPathPrefix;
+ // userPathPrefix is the path specific to the user for which ACLs should be restrictive.
+ // workersPath is the directory path where all the worker znodes are located.
+ private final String userPathPrefix, workersPath;
private String userNameFromPrincipal; // Only set when setting up the secure config for ZK.
private PersistentEphemeralNode znode;
+
+ private SlotZnode slotZnode;
private String znodePath; // unique identity for this instance
private final ServiceRecordMarshal encoder; // to marshal/unmarshal znode data
@@ -147,7 +157,7 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
// number when it returns back. If session timeout expires, the node will be deleted and new
// addition of the same node (restart) will get next sequence number
this.userPathPrefix = USER_SCOPE_PATH_PREFIX + getZkPathUser(this.conf);
- this.pathPrefix = "/" + userPathPrefix + "/" + instanceName + "/workers/worker-";
+ this.workersPath = "/" + userPathPrefix + "/" + instanceName + "/workers";
this.instancesCache = null;
this.instances = null;
this.stateChangeListeners = new HashSet<>();
@@ -292,10 +302,10 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
try {
// PersistentEphemeralNode will make sure the ephemeral node created on server will be present
// even under connection or session interruption (will automatically handle retries)
- znode = new PersistentEphemeralNode(zooKeeperClient,
- PersistentEphemeralNode.Mode.EPHEMERAL_SEQUENTIAL, pathPrefix, encoder.toBytes(srv));
+ znode = new PersistentEphemeralNode(zooKeeperClient, Mode.EPHEMERAL_SEQUENTIAL,
+ workersPath + "/" + WORKER_PREFIX, encoder.toBytes(srv));
- // start the creation of znode
+ // start the creation of znodes
znode.start();
// We'll wait for 120s for node creation
@@ -306,6 +316,14 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
}
znodePath = znode.getActualPath();
+
+ slotZnode = new SlotZnode(
+ zooKeeperClient, workersPath, SLOT_PREFIX, WORKER_PREFIX, uniq.toString());
+ if (!slotZnode.start(znodeCreationTimeout, TimeUnit.SECONDS)) {
+ throw new Exception(
+ "Max znode creation wait time: " + znodeCreationTimeout + "s exhausted");
+ }
+
if (HiveConf.getBoolVar(conf, ConfVars.LLAP_VALIDATE_ACLS)) {
try {
checkAndSetAcls();
@@ -313,7 +331,6 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
throw new IOException("Error validating or setting ACLs. " + DISABLE_MESSAGE, ex);
}
}
- // Set a watch on the znode
if (zooKeeperClient.checkExists().forPath(znodePath) == null) {
// No node exists, throw exception
throw new Exception("Unable to create znode for this LLAP instance on ZooKeeper.");
@@ -325,24 +342,22 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
} catch (Exception e) {
LOG.error("Unable to create a znode for this server instance", e);
CloseableUtils.closeQuietly(znode);
+ CloseableUtils.closeQuietly(slotZnode);
throw (e instanceof IOException) ? (IOException)e : new IOException(e);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Created zknode with path: {} service record: {}", znodePath, srv);
}
+
return uniq.toString();
}
private void checkAndSetAcls() throws Exception {
if (!UserGroupInformation.isSecurityEnabled()) return;
- String pathToCheck = znodePath;
// We are trying to check ACLs on the "workers" directory, which noone except us should be
// able to write to. Higher-level directories shouldn't matter - we don't read them.
- int ix = pathToCheck.lastIndexOf('/');
- if (ix > 0) {
- pathToCheck = pathToCheck.substring(0, ix);
- }
+ String pathToCheck = workersPath;
List<ACL> acls = zooKeeperClient.getACL().forPath(pathToCheck);
if (acls == null || acls.isEmpty()) {
// Can there be no ACLs? There's some access (to get ACLs), so assume it means free for all.
@@ -506,38 +521,61 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
}
@Override
- public Map<String, ServiceInstance> getAll() {
- Map<String, ServiceInstance> instances = new LinkedHashMap<>();
+ public Collection<ServiceInstance> getAll() {
+ List<ServiceInstance> instances = new ArrayList<>();
// TODO: we could refresh instanceCache here on previous failure
for (ChildData childData : instancesCache.getCurrentData()) {
- if (childData != null) {
- byte[] data = childData.getData();
- if (data != null) {
- try {
- ServiceRecord srv = encoder.fromBytes(childData.getPath(), data);
- ServiceInstance instance = new DynamicServiceInstance(srv);
- instances.put(childData.getPath(), instance);
- } catch (IOException e) {
- LOG.error("Unable to decode data for zkpath: {}." +
- " Ignoring from current instances list..", childData.getPath());
- }
- }
+ if (childData == null) continue;
+ byte[] data = childData.getData();
+ if (data == null) continue;
+ if (!extractNodeName(childData).startsWith(WORKER_PREFIX)) continue;
+ try {
+ ServiceRecord srv = encoder.fromBytes(childData.getPath(), data);
+ ServiceInstance instance = new DynamicServiceInstance(srv);
+ instances.add(instance);
+ } catch (IOException e) {
+ LOG.error("Unable to decode data for zkpath: {}." +
+ " Ignoring from current instances list..", childData.getPath());
}
}
return instances;
}
@Override
- public List<ServiceInstance> getAllInstancesOrdered() {
- List<ServiceInstance> list = new LinkedList<>();
- list.addAll(instances.getAll().values());
- Collections.sort(list, new Comparator<ServiceInstance>() {
- @Override
- public int compare(ServiceInstance o1, ServiceInstance o2) {
- return o2.getWorkerIdentity().compareTo(o2.getWorkerIdentity());
+ public Collection<ServiceInstance> getAllInstancesOrdered() {
+ Map<String, String> slotByWorker = new HashMap<String, String>();
+ List<ServiceInstance> unsorted = new LinkedList<ServiceInstance>();
+ for (ChildData childData : instancesCache.getCurrentData()) {
+ if (childData == null) continue;
+ byte[] data = childData.getData();
+ if (data == null) continue;
+ String nodeName = extractNodeName(childData);
+ if (nodeName.startsWith(WORKER_PREFIX)) {
+ try {
+ ServiceRecord srv = encoder.fromBytes(childData.getPath(), data);
+ ServiceInstance instance = new DynamicServiceInstance(srv);
+ unsorted.add(instance);
+ } catch (IOException e) {
+ LOG.error("Unable to decode data for zkpath: {}." +
+ " Ignoring from current instances list..", childData.getPath());
+ }
+ } else if (nodeName.startsWith(SLOT_PREFIX)) {
+ slotByWorker.put(extractWorkerIdFromSlot(childData), nodeName);
+ } else {
+ LOG.info("Ignoring unknown node {}", childData.getPath());
}
- });
- return list;
+ }
+
+ TreeMap<String, ServiceInstance> sorted = new TreeMap<>();
+ for (ServiceInstance worker : unsorted) {
+ String slot = slotByWorker.get(worker.getWorkerIdentity());
+ if (slot == null) {
+ LOG.info("Unknown slot for {}", worker.getWorkerIdentity());
+ continue;
+ }
+ sorted.put(slot, worker);
+ }
+ return sorted.values();
}
@Override
@@ -563,23 +601,22 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
public Set<ServiceInstance> getByHost(String host) {
Set<ServiceInstance> byHost = new HashSet<>();
for (ChildData childData : instancesCache.getCurrentData()) {
- if (childData != null) {
- byte[] data = childData.getData();
- if (data != null) {
- try {
- ServiceRecord srv = encoder.fromBytes(childData.getPath(), data);
- ServiceInstance instance = new DynamicServiceInstance(srv);
- if (host.equals(instance.getHost())) {
- byHost.add(instance);
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Locality comparing " + host + " to " + instance.getHost());
- }
- } catch (IOException e) {
- LOG.error("Unable to decode data for zkpath: {}." +
- " Ignoring host from current instances list..", childData.getPath());
- }
+ if (childData == null) continue;
+ byte[] data = childData.getData();
+ if (data == null) continue;
+ if (!extractNodeName(childData).startsWith(WORKER_PREFIX)) continue;
+ try {
+ ServiceRecord srv = encoder.fromBytes(childData.getPath(), data);
+ ServiceInstance instance = new DynamicServiceInstance(srv);
+ if (host.equals(instance.getHost())) {
+ byHost.add(instance);
}
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Locality comparing " + host + " to " + instance.getHost());
+ }
+ } catch (IOException e) {
+ LOG.error("Unable to decode data for zkpath: {}." +
+ " Ignoring host from current instances list..", childData.getPath());
}
}
@@ -595,6 +632,7 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
}
}
+ // TODO: make class static? fields leak
private class InstanceStateChangeListener implements PathChildrenCacheListener {
private final Logger LOG = LoggerFactory.getLogger(InstanceStateChangeListener.class);
@@ -605,43 +643,59 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
&& client.getState() == CuratorFrameworkState.STARTED, "client is not started");
synchronized (this) {
- if (!stateChangeListeners.isEmpty()) {
- ServiceInstance instance = null;
- ChildData childData = event.getData();
- if (childData != null) {
- byte[] data = childData.getData();
- if (data != null) {
- try {
- ServiceRecord srv = encoder.fromBytes(event.getData().getPath(), data);
- instance = new DynamicServiceInstance(srv);
- } catch (IOException e) {
- LOG.error("Unable to decode data for zknode: {}." +
- " Dropping notification of type: {}", childData.getPath(), event.getType());
- }
- }
- }
-
- // notify listeners of the new data
- for (ServiceInstanceStateChangeListener listener : stateChangeListeners) {
- if (event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED) {
- LOG.info("Added zknode {} to llap namespace. Notifying state change listener.",
- event.getData().getPath());
- listener.onCreate(instance);
- } else if (event.getType() == PathChildrenCacheEvent.Type.CHILD_UPDATED) {
- LOG.info("Updated zknode {} in llap namespace. Notifying state change listener.",
- event.getData().getPath());
- listener.onUpdate(instance);
- } else if (event.getType() == PathChildrenCacheEvent.Type.CHILD_REMOVED) {
- LOG.info("Removed zknode {} from llap namespace. Notifying state change listener.",
- event.getData().getPath());
- listener.onRemove(instance);
- }
+ if (stateChangeListeners.isEmpty()) return;
+ ChildData childData = event.getData();
+ if (childData == null) return;
+ String nodeName = extractNodeName(childData);
+ if (!nodeName.startsWith(WORKER_PREFIX)) return; // No need to propagate slot updates.
+ LOG.info("{} for zknode {} in llap namespace", event.getType(), childData.getPath());
+ ServiceInstance instance = extractServiceInstance(event, childData);
+ for (ServiceInstanceStateChangeListener listener : stateChangeListeners) {
+ switch (event.getType()) {
+ case CHILD_ADDED:
+ listener.onCreate(instance);
+ break;
+ case CHILD_UPDATED:
+ listener.onUpdate(instance);
+ break;
+ case CHILD_REMOVED:
+ listener.onRemove(instance);
+ break;
+ default:
+ // Ignore all the other events; logged above.
}
}
}
}
}
+ private static String extractWorkerIdFromSlot(ChildData childData) {
+ return new String(childData.getData(), SlotZnode.CHARSET);
+ }
+
+ private static String extractNodeName(ChildData childData) {
+ String nodeName = childData.getPath();
+ int ix = nodeName.lastIndexOf("/");
+ if (ix >= 0) {
+ nodeName = nodeName.substring(ix + 1);
+ }
+ return nodeName;
+ }
+
+ private ServiceInstance extractServiceInstance(
+ PathChildrenCacheEvent event, ChildData childData) {
+ byte[] data = childData.getData();
+ if (data == null) return null;
+ try {
+ ServiceRecord srv = encoder.fromBytes(event.getData().getPath(), data);
+ return new DynamicServiceInstance(srv);
+ } catch (IOException e) {
+ LOG.error("Unable to decode data for zknode: {}." +
+ " Dropping notification of type: {}", childData.getPath(), event.getType());
+ return null;
+ }
+ }
+
@Override
public ServiceInstanceSet getInstances(String component) throws IOException {
checkPathChildrenCache();
@@ -669,8 +723,7 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
// lazily create PathChildrenCache
if (instancesCache == null) {
- this.instancesCache = new PathChildrenCache(zooKeeperClient,
- RegistryPathUtils.parentOf(pathPrefix).toString(), true);
+ this.instancesCache = new PathChildrenCache(zooKeeperClient, workersPath, true);
instancesCache.getListenable().addListener(new InstanceStateChangeListener(),
Executors.newFixedThreadPool(1, new ThreadFactoryBuilder()
.setDaemon(true)
@@ -698,6 +751,7 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
@Override
public void stop() throws IOException {
CloseableUtils.closeQuietly(znode);
+ CloseableUtils.closeQuietly(slotZnode);
CloseableUtils.closeQuietly(instancesCache);
CloseableUtils.closeQuietly(zooKeeperClient);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/9365bec5/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/SlotZnode.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/SlotZnode.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/SlotZnode.java
new file mode 100644
index 0000000..362dd91
--- /dev/null
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/SlotZnode.java
@@ -0,0 +1,334 @@
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hive.llap.registry.impl;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.TreeSet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * We would have used the curator ephemeral node with some extra logic but it doesn't handle
+ * the EXISTS condition, which is crucial here; so we c/p parts of Curator and add our logic.
+ */
+public class SlotZnode implements Closeable {
+ static final Charset CHARSET = StandardCharsets.UTF_8;
+ private final static Logger LOG = LoggerFactory.getLogger(SlotZnode.class);
+
+ private final AtomicReference<CountDownLatch> initialCreateLatch =
+ new AtomicReference<CountDownLatch>(new CountDownLatch(1));
+ private final AtomicReference<String> nodePath = new AtomicReference<String>(null);
+ private final Random rdm = new Random();
+ private final CuratorFramework client;
+ private final String basePath, prefix, workerPrefix, dataStr;
+ private final byte[] data;
+ private int currentSlot;
+ private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
+ private int fallbackCount = 0; // Test-only counter.
+ private final BackgroundCallback backgroundCallback = new BackgroundCallback() {
+ @Override
+ public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
+ processCreateResult(client, event);
+ }
+ };
+ private final Watcher watcher = new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ processWatchedEvent(event);
+ }
+ };
+ private final BackgroundCallback checkExistsCallback = new BackgroundCallback() {
+ @Override
+ public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
+ processWatchResult(event);
+ }
+ };
+ private final ConnectionStateListener connectionStateListener = new ConnectionStateListener() {
+ @Override
+ public void stateChanged(CuratorFramework client, ConnectionState newState) {
+ processConnectionState(newState);
+ }
+ };
+
+ private enum State {
+ LATENT,
+ INITIAL_SELECTION,
+ AFTER_SELECTION,
+ CLOSED
+ }
+
+ public SlotZnode(
+ CuratorFramework client, String basePath, String prefix, String workerPrefix, String data) {
+ this.client = Preconditions.checkNotNull(client, "client cannot be null");
+ this.basePath = Preconditions.checkNotNull(basePath, "basePath cannot be null");
+ this.prefix = Preconditions.checkNotNull(prefix, "prefix cannot be null");
+ this.workerPrefix = workerPrefix;
+ Preconditions.checkNotNull(data, "data cannot be null");
+ this.dataStr = data;
+ this.data = data.getBytes(CHARSET);
+ }
+
+ @VisibleForTesting
+ public int getFallbackCount() {
+ return fallbackCount;
+ }
+
+ private void chooseSlotToTake() throws Exception {
+ int slotToTake = -1;
+ while (true) {
+ List<String> allChildNodes;
+ try {
+ allChildNodes = client.getChildren().forPath(basePath);
+ } catch (Exception e) {
+ LOG.error("Cannot list nodes to get slots; failing", e);
+ throw e;
+ }
+ TreeSet<Integer> slots = new TreeSet<>();
+ int approxWorkerCount = 0;
+ for (String child : allChildNodes) {
+ if (!child.startsWith(prefix)) {
+ if (child.startsWith(workerPrefix)) {
+ ++approxWorkerCount;
+ }
+ } else {
+ slots.add(Integer.parseInt(child.substring(prefix.length())));
+ }
+ }
+ Iterator<Integer> slotIter = slots.iterator();
+ slotToTake = 0;
+ while (slotIter.hasNext()) {
+ int nextTaken = slotIter.next();
+ if (slotToTake < nextTaken) break;
+ slotToTake = nextTaken + 1;
+ }
+ // There may be a race for this slot so re-query after a delay with some probability.
+ if (slotToTake != currentSlot || !shouldFallBackOnCollision(approxWorkerCount)) break;
+ ++fallbackCount;
+ Thread.sleep(rdm.nextInt(200)); // arbitrary
+ }
+
+ currentSlot = slotToTake;
+ LOG.info("Will attempt to take slot " + currentSlot);
+ }
+
+
+ private boolean shouldFallBackOnCollision(int approxWorkerCount) {
+ // Ideally, we'd want 1 worker to try for every slot; e.g. if there are 4 workers we want 3
+ // to re-read, i.e. probability of falling back = 0.75, or 1/4 < random([0,1)). However, we
+ // make it slightly more probable (2.0x) to avoid too much re-reading. This is hand-wavy.
+ if (approxWorkerCount == 0) return false;
+ return (2.0f / approxWorkerCount) <= rdm.nextDouble();
+ }
+
+ private String getSlotPath(int slot) {
+ return String.format("%s/%s%010d", basePath, prefix, slot);
+ }
+
+ public boolean start(long timeout, TimeUnit unit) throws Exception {
+ Preconditions.checkState(state.compareAndSet(State.LATENT, State.INITIAL_SELECTION), "Already started");
+ CountDownLatch localLatch = initialCreateLatch.get();
+ client.getConnectionStateListenable().addListener(connectionStateListener);
+ chooseSlotToTake();
+ startCreateCurrentNode();
+ return localLatch.await(timeout, unit);
+ }
+
+ @Override
+ public void close() throws IOException {
+ State currentState = state.getAndSet(State.CLOSED);
+ if (currentState == State.CLOSED || currentState == State.LATENT) return;
+ client.getConnectionStateListenable().removeListener(connectionStateListener);
+ String localNodePath = nodePath.getAndSet(null);
+ if (localNodePath == null) return;
+ try {
+ client.delete().guaranteed().forPath(localNodePath);
+ } catch (KeeperException.NoNodeException ignore) {
+ } catch (Exception e) {
+ LOG.error("Deleting node: " + localNodePath, e);
+ throw new IOException(e);
+ }
+ }
+
+ public int getCurrentSlot() {
+ assert isActive();
+ return currentSlot;
+ }
+
+ private void startCreateCurrentNode() {
+ if (!isActive()) return;
+ String createPath = null;
+ try {
+ createPath = getSlotPath(currentSlot);
+ LOG.info("Attempting to create " + createPath);
+ client.create().withMode(CreateMode.EPHEMERAL).inBackground(backgroundCallback)
+ .forPath(createPath, data);
+ } catch (Exception e) {
+ LOG.error("Creating node. Path: " + createPath, e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void watchNode() throws Exception {
+ if (!isActive()) return;
+ String localNodePath = nodePath.get();
+ if (localNodePath == null) return;
+ try {
+ client.checkExists().usingWatcher(watcher).inBackground(
+ checkExistsCallback).forPath(localNodePath);
+ } catch (Exception e) {
+ LOG.error("Watching node: " + localNodePath, e);
+ throw e;
+ }
+ }
+
+ private boolean isActive() {
+ State localState = state.get();
+ return (localState != State.LATENT && localState != State.CLOSED);
+ }
+
+ private void processWatchResult(CuratorEvent event) throws Exception {
+ if (event.getResultCode() != KeeperException.Code.NONODE.intValue()) return;
+ LOG.info("Trying to reacquire because of the NONODE event");
+ startCreateCurrentNode();
+ }
+
+
+ private void processConnectionState(ConnectionState newState) {
+ if (newState != ConnectionState.RECONNECTED) return;
+ LOG.info("Trying to reacquire because of the RECONNECTED event");
+ startCreateCurrentNode();
+ }
+
+
+ private void processWatchedEvent(WatchedEvent event) {
+ if (event.getType() != EventType.NodeDeleted) return;
+ String localPath = nodePath.get();
+ if (localPath == null) return;
+ if (!localPath.equals(event.getPath())) {
+ LOG.info("Ignoring the NodeDeleted event for " + event.getPath());
+ return;
+ }
+ LOG.info("Trying to reacquire because of the NodeDeleted event");
+ startCreateCurrentNode();
+ }
+
+
+ private void processCreateResult(CuratorFramework client, CuratorEvent event) throws Exception {
+ boolean doesExist = event.getResultCode() == KeeperException.Code.NODEEXISTS.intValue();
+ if (!doesExist && event.getResultCode() != KeeperException.Code.OK.intValue()) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Trying to reacquire due to create error: " + event);
+ }
+ startCreateCurrentNode(); // TODO: a pattern from Curator. Better error handling?
+ return;
+ }
+ State localState = state.get();
+ switch (localState) {
+ case CLOSED:
+ case LATENT:
+ return;
+ case INITIAL_SELECTION:
+ if (doesExist) {
+ LOG.info("Slot " + currentSlot + " was occupied");
+ chooseSlotToTake(); // Try another slot.
+ startCreateCurrentNode();
+ } else {
+ handleCreatedNode(event.getName());
+ }
+ break;
+ case AFTER_SELECTION:
+ if (doesExist) {
+ processExistsFromCreate(client, event.getPath());
+ } else {
+ handleCreatedNode(event.getName());
+ }
+ break;
+ default:
+ throw new AssertionError("Unknown state " + localState);
+ }
+ }
+
+
+ private void processExistsFromCreate(CuratorFramework client, String path) throws Exception {
+ byte[] actual;
+ try {
+ actual = client.getData().forPath(path);
+ } catch (Exception ex) {
+ LOG.error("Error getting data for the node; will retry creating", ex);
+ startCreateCurrentNode();
+ return;
+ }
+ if (Arrays.equals(actual, data)) {
+ handleCreatedNode(path);
+ } else {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Data at {} is from a different node: {} (we are {})",
+ path, new String(actual, CHARSET), dataStr);
+ }
+ nodePath.getAndSet(null);
+ chooseSlotToTake(); // Try another slot.
+ startCreateCurrentNode();
+ }
+ }
+
+ private void handleCreatedNode(String path) throws Exception {
+ while (true) {
+ State localState = state.get();
+ if (localState == State.CLOSED || localState == State.LATENT) return;
+ if (state.compareAndSet(localState, State.AFTER_SELECTION)) break;
+ }
+ nodePath.set(path);
+ watchNode();
+ CountDownLatch localLatch = initialCreateLatch.getAndSet(null);
+ if (localLatch != null) {
+ localLatch.countDown();
+ }
+ LOG.info("Acquired the slot znode {}{}", path,
+ localLatch != null ? "; this is the initial assignment" : "");
+ }
+
+ @VisibleForTesting
+ public String getActualPath() {
+ return nodePath.get();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/9365bec5/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClient.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClient.java b/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClient.java
index 921e050..ace9475 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClient.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClient.java
@@ -137,11 +137,11 @@ public class LlapTokenClient {
registry.start();
activeInstances = registry.getInstances();
}
- Map<String, ServiceInstance> daemons = activeInstances.getAll();
+ Collection<ServiceInstance> daemons = activeInstances.getAll();
if (daemons == null || daemons.isEmpty()) {
throw new RuntimeException("No LLAPs found");
}
- lastKnownInstances = daemons.values();
+ lastKnownInstances = daemons;
return new ArrayList<ServiceInstance>(lastKnownInstances);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/9365bec5/llap-client/src/test/org/apache/hadoop/hive/llap/registry/impl/TestSlotZnode.java
----------------------------------------------------------------------
diff --git a/llap-client/src/test/org/apache/hadoop/hive/llap/registry/impl/TestSlotZnode.java b/llap-client/src/test/org/apache/hadoop/hive/llap/registry/impl/TestSlotZnode.java
new file mode 100644
index 0000000..209168f
--- /dev/null
+++ b/llap-client/src/test/org/apache/hadoop/hive/llap/registry/impl/TestSlotZnode.java
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hive.llap.registry.impl;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryOneTime;
+
+import org.apache.curator.test.KillSession;
+import org.apache.curator.test.TestingServer;
+import org.apache.curator.utils.DebugUtils;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.Stat;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.BindException;
+import java.util.Collection;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.*;
+
+/** See SlotZNode; some tests (and the setup) are c/p-ed or modified from Curator. */
+public class TestSlotZnode {
+ private static final String DIR = "/test";
+ private static final String PATH = ZKPaths.makePath(DIR, "/foo");
+ private final Collection<CuratorFramework> curatorInstances = Lists.newArrayList();
+ private final Collection<SlotZnode> createdNodes = Lists.newArrayList();
+ private TestingServer server;
+
+ private static Logger LOG = LoggerFactory.getLogger(TestSlotZnode.class);
+
+ @Before
+ public void setUp() throws Exception {
+ System.setProperty(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES, "true");
+ while (server == null) {
+ try {
+ server = new TestingServer();
+ } catch (BindException e) {
+ LOG.warn("Getting bind exception - retrying to allocate server");
+ server = null;
+ }
+ }
+ }
+
+ @After
+ public void teardown() throws Exception {
+ for (SlotZnode node : createdNodes) {
+ node.close();
+ }
+ for (CuratorFramework curator : curatorInstances) {
+ curator.close();
+ }
+ server.close();
+ server = null;
+ }
+
+ @Test
+ public void testDeletesNodeWhenClosed() throws Exception {
+ CuratorFramework curator = newCurator();
+ SlotZnode node = createZnode(curator);
+ assertTrue(node.start(5, TimeUnit.SECONDS));
+ String path = null;
+ try {
+ path = node.getActualPath();
+ assertNodeExists(curator, path);
+ } finally {
+ node.close(); // After closing the path is set to null...
+ }
+ assertNodeDoesNotExist(curator, path);
+ }
+
+ @Test
+ public void testDeletedAndRecreatedNodeWhenSessionReconnects() throws Exception {
+ CuratorFramework curator = newCurator();
+ CuratorFramework observer = newCurator();
+ SlotZnode node = createZnode(curator);
+ assertTrue(node.start(5, TimeUnit.SECONDS));
+ String originalPath = node.getActualPath();
+ assertNodeExists(observer, originalPath);
+ Trigger deletedTrigger = Trigger.deleted();
+ observer.checkExists().usingWatcher(deletedTrigger).forPath(node.getActualPath());
+ killSession(curator);
+ // Make sure the node got deleted.
+ assertTrue(deletedTrigger.firedWithin(5, TimeUnit.SECONDS));
+ // Check for it to be recreated.
+ Trigger createdTrigger = Trigger.created();
+ Stat stat = observer.checkExists().usingWatcher(createdTrigger).forPath(originalPath);
+ assertTrue(stat != null || createdTrigger.firedWithin(5, TimeUnit.SECONDS));
+ }
+
+ @Test
+ public void testRecreatesNodeWhenItGetsDeleted() throws Exception {
+ CuratorFramework curator = newCurator();
+ SlotZnode node = createZnode(curator);
+ assertTrue(node.start(5, TimeUnit.SECONDS));
+ String originalNode = node.getActualPath();
+ assertNodeExists(curator, originalNode);
+ // Delete the original node...
+ curator.delete().forPath(originalNode);
+ Trigger createdWatchTrigger = Trigger.created();
+ Stat stat = curator.checkExists().usingWatcher(createdWatchTrigger).forPath(originalNode);
+ assertTrue(stat != null || createdWatchTrigger.firedWithin(5, TimeUnit.SECONDS));
+ }
+
+ @Test
+ public void testPathUsage() throws Exception {
+ CuratorFramework curator = newCurator();
+ SlotZnode node1 = createZnode(curator),
+ node2 = createZnode(curator), node3 = createZnode(curator);
+ assertTrue(node1.start(5, TimeUnit.SECONDS));
+ String path1 = node1.getActualPath();
+ assertTrue(node2.start(5, TimeUnit.SECONDS));
+ String path2 = node2.getActualPath();
+ assertFalse(path1.equals(path2));
+ node1.close();
+ // Path must be reused.
+ assertTrue(node3.start(5, TimeUnit.SECONDS));
+ assertTrue(path1.equals(node3.getActualPath()));
+ }
+
+ private void syncThreadStart(final CountDownLatch cdlIn, final CountDownLatch cdlOut) {
+ cdlIn.countDown();
+ try {
+ cdlOut.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Test
+ public void testConcurrencyAndFallback() throws Exception {
+ concurrencyTest(100, true);
+ }
+
+ @Test
+ public void testConcurrencyNoFallback() throws Exception {
+ concurrencyTest(100, false);
+ }
+
+ private void concurrencyTest(final int nodeCount, boolean isFallback) throws Exception {
+ final CuratorFramework curator = newCurator();
+ ExecutorService executor = Executors.newFixedThreadPool(nodeCount);
+ final CountDownLatch cdlIn = new CountDownLatch(nodeCount), cdlOut = new CountDownLatch(1);
+ @SuppressWarnings("unchecked")
+ FutureTask<SlotZnode>[] startTasks = new FutureTask[nodeCount];
+ for (int i = 0; i < nodeCount; ++i) {
+ if (isFallback) {
+ curator.create().creatingParentsIfNeeded().forPath(PATH + "/worker-" + i);
+ }
+ startTasks[i] = new FutureTask<SlotZnode>(new Callable<SlotZnode>() {
+ SlotZnode node = createZnode(curator);
+ public SlotZnode call() throws Exception {
+ syncThreadStart(cdlIn, cdlOut);
+ return node.start(5, TimeUnit.SECONDS) ? node : null;
+ }
+ });
+ }
+ for (int i = 0; i < startTasks.length; ++i) {
+ executor.execute(startTasks[i]);
+ }
+ cdlIn.await();
+ cdlOut.countDown();
+ boolean[] found = new boolean[nodeCount];
+ int totalFallbackCount = 0;
+ for (int i = 0; i < startTasks.length; ++i) {
+ SlotZnode node = startTasks[i].get();
+ assertNotNull(node);
+ totalFallbackCount += node.getFallbackCount();
+ int slot = node.getCurrentSlot();
+ assertTrue(slot < found.length);
+ assertFalse(found[slot]); // Given these 2 lines we don't need to double check later.
+ found[slot] = true;
+ }
+ if (isFallback) {
+ LOG.info("Total fallback count " + totalFallbackCount);
+ assertTrue(totalFallbackCount > 0);
+ }
+ }
+
+ private CuratorFramework newCurator() throws IOException {
+ CuratorFramework client = CuratorFrameworkFactory.newClient(
+ server.getConnectString(), 10000, 10000, new RetryOneTime(1));
+ client.start();
+ curatorInstances.add(client);
+ return client;
+ }
+
+ private SlotZnode createZnode(CuratorFramework curator) throws Exception {
+ if (curator.checkExists().forPath(PATH) == null) {
+ curator.create().creatingParentsIfNeeded().forPath(PATH);
+ }
+ SlotZnode result = new SlotZnode(curator, PATH, "slot-", "worker-", "");
+ createdNodes.add(result);
+ return result;
+ }
+
+ private void assertNodeExists(CuratorFramework curator, String path) throws Exception {
+ assertNotNull(path);
+ assertTrue(curator.checkExists().forPath(path) != null);
+ }
+
+ private void assertNodeDoesNotExist(CuratorFramework curator, String path) throws Exception {
+ assertTrue(curator.checkExists().forPath(path) == null);
+ }
+
+ public void killSession(CuratorFramework curator) throws Exception {
+ KillSession.kill(curator.getZookeeperClient().getZooKeeper(), curator.getZookeeperClient().getCurrentConnectionString());
+ }
+
+ private static final class Trigger implements Watcher {
+ private final Event.EventType type;
+ private final CountDownLatch latch;
+ public Trigger(Event.EventType type) {
+ assertNotNull(type);
+ this.type = type;
+ this.latch = new CountDownLatch(1);
+ }
+ @Override
+ public void process(WatchedEvent event) {
+ if (type == event.getType()) {
+ latch.countDown();
+ }
+ }
+ public boolean firedWithin(long duration, TimeUnit unit) {
+ try {
+ return latch.await(duration, unit);
+ } catch (InterruptedException e) {
+ throw Throwables.propagate(e);
+ }
+ }
+ private static Trigger created() {
+ return new Trigger(Event.EventType.NodeCreated);
+ }
+ private static Trigger deleted() {
+ return new Trigger(Event.EventType.NodeDeleted);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/9365bec5/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusServiceDriver.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusServiceDriver.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusServiceDriver.java
index 17ce69b..0efe545 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusServiceDriver.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusServiceDriver.java
@@ -24,6 +24,7 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintWriter;
+import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.LinkedList;
@@ -484,34 +485,30 @@ public class LlapStatusServiceDriver {
"Failed to create llap registry client", e);
}
try {
- Map<String, ServiceInstance> serviceInstanceMap;
+ Collection<ServiceInstance> serviceInstances;
try {
- serviceInstanceMap = llapRegistry.getInstances().getAll();
+ serviceInstances = llapRegistry.getInstances().getAll();
} catch (IOException e) {
throw new LlapStatusCliException(ExitCode.LLAP_REGISTRY_ERROR, "Failed to get instances from llap registry", e);
}
- if (serviceInstanceMap == null || serviceInstanceMap.isEmpty()) {
+ if (serviceInstances == null || serviceInstances.isEmpty()) {
LOG.info("No information found in the LLAP registry");
appStatusBuilder.setLiveInstances(0);
appStatusBuilder.setState(State.LAUNCHING);
appStatusBuilder.clearLlapInstances();
return ExitCode.SUCCESS;
} else {
-
-
// Tracks instances known by both slider and llap.
List<LlapInstance> validatedInstances = new LinkedList<>();
List<String> llapExtraInstances = new LinkedList<>();
- for (Map.Entry<String, ServiceInstance> serviceInstanceEntry : serviceInstanceMap
- .entrySet()) {
-
- ServiceInstance serviceInstance = serviceInstanceEntry.getValue();
- String containerIdString = serviceInstance.getProperties().get(HiveConf.ConfVars.LLAP_DAEMON_CONTAINER_ID.varname);
-
+ for (ServiceInstance serviceInstance : serviceInstances) {
+ String containerIdString = serviceInstance.getProperties().get(
+ HiveConf.ConfVars.LLAP_DAEMON_CONTAINER_ID.varname);
- LlapInstance llapInstance = appStatusBuilder.removeAndgetLlapInstanceForContainer(containerIdString);
+ LlapInstance llapInstance = appStatusBuilder.removeAndgetLlapInstanceForContainer(
+ containerIdString);
if (llapInstance != null) {
llapInstance.setMgmtPort(serviceInstance.getManagementPort());
llapInstance.setRpcPort(serviceInstance.getRpcPort());
http://git-wip-us.apache.org/repos/asf/hive/blob/9365bec5/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
index efd774d..10d9ad1 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
@@ -313,7 +313,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
registry.start();
registry.registerStateChangeListener(new NodeStateChangeListener());
activeInstances = registry.getInstances();
- for (ServiceInstance inst : activeInstances.getAll().values()) {
+ for (ServiceInstance inst : activeInstances.getAll()) {
addNode(inst, new NodeInfo(inst, nodeBlacklistConf, clock, numSchedulableTasksPerNode,
metrics));
}
@@ -326,21 +326,21 @@ public class LlapTaskSchedulerService extends TaskScheduler {
private final Logger LOG = LoggerFactory.getLogger(NodeStateChangeListener.class);
@Override
- public void onCreate(final ServiceInstance serviceInstance) {
+ public void onCreate(ServiceInstance serviceInstance) {
addNode(serviceInstance, new NodeInfo(serviceInstance, nodeBlacklistConf, clock,
numSchedulableTasksPerNode, metrics));
LOG.info("Added node with identity: {}", serviceInstance.getWorkerIdentity());
}
@Override
- public void onUpdate(final ServiceInstance serviceInstance) {
+ public void onUpdate(ServiceInstance serviceInstance) {
instanceToNodeMap.put(serviceInstance.getWorkerIdentity(), new NodeInfo(serviceInstance,
nodeBlacklistConf, clock, numSchedulableTasksPerNode, metrics));
LOG.info("Updated node with identity: {}", serviceInstance.getWorkerIdentity());
}
@Override
- public void onRemove(final ServiceInstance serviceInstance) {
+ public void onRemove(ServiceInstance serviceInstance) {
// FIXME: disabling this for now
// instanceToNodeMap.remove(serviceInstance.getWorkerIdentity());
LOG.info("Removed node with identity: {}", serviceInstance.getWorkerIdentity());
@@ -444,7 +444,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
readLock.lock();
try {
int numInstancesFound = 0;
- for (ServiceInstance inst : activeInstances.getAll().values()) {
+ for (ServiceInstance inst : activeInstances.getAll()) {
if (inst.isAlive()) {
Resource r = inst.getResource();
memory += r.getMemory();
@@ -494,7 +494,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
readLock.lock();
try {
int n = 0;
- for (ServiceInstance inst : activeInstances.getAll().values()) {
+ for (ServiceInstance inst : activeInstances.getAll()) {
if (inst.isAlive()) {
n++;
}
@@ -811,7 +811,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
/* check again whether nodes are disabled or just missing */
writeLock.lock();
try {
- for (ServiceInstance inst : activeInstances.getAll().values()) {
+ for (ServiceInstance inst : activeInstances.getAll()) {
if (inst.isAlive() && instanceToNodeMap.containsKey(inst.getWorkerIdentity()) == false) {
/* that's a good node, not added to the allocations yet */
LOG.info("Found a new node: " + inst + ".");
@@ -825,7 +825,6 @@ public class LlapTaskSchedulerService extends TaskScheduler {
}
private void addNode(ServiceInstance inst, NodeInfo node) {
- LOG.info("Adding node: " + inst);
// we have just added a new node. Signal timeout monitor to reset timer
if (activeInstances.size() == 1) {
LOG.info("New node added. Signalling scheduler timeout monitor thread to stop timer.");
http://git-wip-us.apache.org/repos/asf/hive/blob/9365bec5/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java
index 8a4fc08..2e9918e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java
@@ -18,7 +18,7 @@
package org.apache.hadoop.hive.ql.exec.tez;
import java.io.IOException;
-import java.util.List;
+import java.util.Collection;
import org.apache.commons.lang.ArrayUtils;
import org.apache.hadoop.conf.Configuration;
@@ -40,7 +40,7 @@ public class Utils {
LlapRegistryService serviceRegistry;
serviceRegistry = LlapRegistryService.getClient(conf);
- List<ServiceInstance> serviceInstances =
+ Collection<ServiceInstance> serviceInstances =
serviceRegistry.getInstances().getAllInstancesOrdered();
String[] locations = new String[serviceInstances.size()];
int i = 0;
http://git-wip-us.apache.org/repos/asf/hive/blob/9365bec5/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java
index 54f7363..7ed3df1 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java
@@ -131,9 +131,9 @@ public class TestHostAffinitySplitLocationProvider {
newMsg = newLoc + " splits went to the new node";
LOG.info(movedMsg + " and " + newMsg + msgTail);
double maxMoved = 1.0f * splits.length / locs, minNew = 1.0f * splits.length / locs;
- movedRatioSum += Math.max(moved / maxMoved, 1f);
+ movedRatioSum += moved / maxMoved;
movedRatioWorst = Math.max(moved / maxMoved, movedRatioWorst);
- newRatioSum += Math.min(newLoc / minNew, 1f);
+ newRatioSum += newLoc / minNew;
newRatioWorst = Math.min(newLoc / minNew, newRatioWorst);
logBadRatios(failBuilder, moved, newLoc, msgTail, movedMsg, newMsg, maxMoved, minNew);
}