You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2016/05/06 17:25:01 UTC

[11/39] hive git commit: HIVE-13305: LlapInputFormat should get LlapOutputFormatService port from the LLAP service registry

HIVE-13305: LlapInputFormat should get LlapOutputFormatService port from the LLAP service registry


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/25140659
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/25140659
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/25140659

Branch: refs/heads/master
Commit: 25140659c578e3e01a3ce36ba5108a38303dc843
Parents: 2945c3b
Author: Jason Dere <jd...@hortonworks.com>
Authored: Thu Mar 17 16:12:34 2016 -0700
Committer: Jason Dere <jd...@hortonworks.com>
Committed: Thu Mar 17 16:12:34 2016 -0700

----------------------------------------------------------------------
 .../hive/llap/registry/ServiceInstance.java     |  7 ++
 .../registry/impl/LlapFixedRegistryImpl.java    |  7 ++
 .../impl/LlapZookeeperRegistryImpl.java         | 16 ++++
 .../hadoop/hive/llap/LlapInputFormat.java       | 78 ++++++++++++++++++--
 4 files changed, 102 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/25140659/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstance.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstance.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstance.java
index 2bd860a..a504146 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstance.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstance.java
@@ -54,6 +54,13 @@ public interface ServiceInstance {
   public int getShufflePort();
 
   /**
+   * OutputFormat endpoint for service instance
+   *
+   * @return
+   */
+  public int getOutputFormatPort();
+
+  /**
    * Return the last known state (without refreshing)
    * 
    * @return

http://git-wip-us.apache.org/repos/asf/hive/blob/25140659/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
index 8cace8f..33ab591 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
@@ -52,6 +52,7 @@ public class LlapFixedRegistryImpl implements ServiceRegistry {
   private final int port;
   private final int shuffle;
   private final int mngPort;
+  private final int outputFormatPort;
   private final String[] hosts;
   private final int memory;
   private final int vcores;
@@ -65,6 +66,7 @@ public class LlapFixedRegistryImpl implements ServiceRegistry {
     this.shuffle = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_YARN_SHUFFLE_PORT);
     this.resolveHosts = conf.getBoolean(FIXED_REGISTRY_RESOLVE_HOST_NAMES, true);
     this.mngPort = HiveConf.getIntVar(conf, ConfVars.LLAP_MANAGEMENT_RPC_PORT);
+    this.outputFormatPort = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_PORT);
 
     for (Map.Entry<String, String> kv : conf) {
       if (kv.getKey().startsWith(HiveConf.PREFIX_LLAP)
@@ -151,6 +153,11 @@ public class LlapFixedRegistryImpl implements ServiceRegistry {
     }
 
     @Override
+    public int getOutputFormatPort() {
+      return LlapFixedRegistryImpl.this.outputFormatPort;
+    }
+
+    @Override
     public boolean isAlive() {
       return true;
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/25140659/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
index ab9fa39..c440e1e 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
@@ -90,6 +90,7 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
   private static final String IPC_MNG = "llapmng";
   private static final String IPC_SHUFFLE = "shuffle";
   private static final String IPC_LLAP = "llap";
+  private static final String IPC_OUTPUTFORMAT = "llapoutputformat";
   private final static String ROOT_NAMESPACE = "llap";
 
   private final Configuration conf;
@@ -241,6 +242,11 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
         HiveConf.getIntVar(conf, ConfVars.LLAP_MANAGEMENT_RPC_PORT)));
   }
 
+  public Endpoint getOutputFormatEndpoint() {
+    return RegistryTypeUtils.ipcEndpoint(IPC_OUTPUTFORMAT, new InetSocketAddress(hostname,
+        HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_PORT)));
+  }
+
   @Override
   public void register() throws IOException {
     ServiceRecord srv = new ServiceRecord();
@@ -310,6 +316,7 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
     private final int rpcPort;
     private final int mngPort;
     private final int shufflePort;
+    private final int outputFormatPort;
 
     public DynamicServiceInstance(ServiceRecord srv) throws IOException {
       this.srv = srv;
@@ -317,6 +324,7 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
       final Endpoint shuffle = srv.getInternalEndpoint(IPC_SHUFFLE);
       final Endpoint rpc = srv.getInternalEndpoint(IPC_LLAP);
       final Endpoint mng = srv.getInternalEndpoint(IPC_MNG);
+      final Endpoint outputFormat = srv.getInternalEndpoint(IPC_OUTPUTFORMAT);
 
       this.host =
           RegistryTypeUtils.getAddressField(rpc.addresses.get(0),
@@ -330,6 +338,9 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
       this.shufflePort =
           Integer.valueOf(RegistryTypeUtils.getAddressField(shuffle.addresses.get(0),
               AddressTypes.ADDRESS_PORT_FIELD));
+      this.outputFormatPort =
+          Integer.valueOf(RegistryTypeUtils.getAddressField(outputFormat.addresses.get(0),
+              AddressTypes.ADDRESS_PORT_FIELD));
     }
 
     @Override
@@ -386,6 +397,11 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
       return mngPort;
     }
 
+    @Override
+    public int getOutputFormatPort() {
+      return outputFormatPort;
+    }
+
     // Relying on the identity hashCode and equality, since refreshing instances retains the old copy
     // of an already known instance.
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/25140659/llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java b/llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java
index b32d662..847c74f 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java
@@ -17,10 +17,12 @@
 package org.apache.hadoop.hive.llap;
 
 import java.io.IOException;
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.nio.ByteBuffer;
 import java.util.List;
+import java.util.Set;
 
 import com.google.common.collect.Lists;
 import com.google.protobuf.ByteString;
@@ -28,6 +30,9 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
 import org.apache.hadoop.hive.llap.ext.LlapTaskUmbilicalExternalClient;
+import org.apache.hadoop.hive.llap.registry.ServiceInstance;
+import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet;
+import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
 import org.apache.hadoop.hive.llap.tez.Converters;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
@@ -72,8 +77,12 @@ public class LlapInputFormat<V extends WritableComparable> implements InputForma
     LlapInputSplit llapSplit = (LlapInputSplit) split;
     SubmitWorkInfo submitWorkInfo = SubmitWorkInfo.fromBytes(llapSplit.getPlanBytes());
 
-    // TODO HACK: Spark is built with Hive-1.2.1, does not have access to HiveConf.ConfVars.LLAP_DAEMON_RPC_PORT
-    int llapSubmitPort = job.getInt("hive.llap.daemon.rpc.port", 15001);
+    ServiceInstance serviceInstance = getServiceInstance(job, llapSplit);
+    String host = serviceInstance.getHost();
+    int llapSubmitPort = serviceInstance.getRpcPort();
+
+    LOG.info("Found service instance for host " + host + " with rpc port " + llapSubmitPort
+        + " and outputformat port " + serviceInstance.getOutputFormatPort());
 
     LlapTaskUmbilicalExternalClient llapClient =
       new LlapTaskUmbilicalExternalClient(job, submitWorkInfo.getTokenIdentifier(),
@@ -92,16 +101,13 @@ public class LlapInputFormat<V extends WritableComparable> implements InputForma
     List<TezEvent> tezEventList = Lists.newArrayList();
     tezEventList.add(tezEvent);
 
-    // this is just the portion that sets up the io to receive data
-    String host = split.getLocations()[0];
-
     llapClient.submitWork(submitWorkRequestProto, host, llapSubmitPort, tezEventList);
 
     String id = HiveConf.getVar(job, HiveConf.ConfVars.HIVEQUERYID) + "_" + llapSplit.getSplitNum();
 
     HiveConf conf = new HiveConf();
     Socket socket = new Socket(host,
-        conf.getIntVar(HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_PORT));
+        serviceInstance.getOutputFormatPort());
 
     LOG.debug("Socket connected");
 
@@ -119,6 +125,66 @@ public class LlapInputFormat<V extends WritableComparable> implements InputForma
     throw new IOException("These are not the splits you are looking for.");
   }
 
+  private ServiceInstance getServiceInstance(JobConf job, LlapInputSplit llapSplit) throws IOException {
+    LlapRegistryService registryService = LlapRegistryService.getClient(job);
+    String host = llapSplit.getLocations()[0];
+
+    ServiceInstance serviceInstance = getServiceInstanceForHost(registryService, host);
+    if (serviceInstance == null) {
+      throw new IOException("No service instances found for " + host + " in registry");
+    }
+
+    return serviceInstance;
+  }
+
+  private ServiceInstance getServiceInstanceForHost(LlapRegistryService registryService, String host) throws IOException {
+    InetAddress address = InetAddress.getByName(host);
+    ServiceInstanceSet instanceSet = registryService.getInstances();
+    ServiceInstance serviceInstance = null;
+
+    // The name used in the service registry may not match the host name we're using.
+    // Try hostname/canonical hostname/host address
+
+    String name = address.getHostName();
+    LOG.info("Searching service instance by hostname " + name);
+    serviceInstance = selectServiceInstance(instanceSet.getByHost(name));
+    if (serviceInstance != null) {
+      return serviceInstance;
+    }
+
+    name = address.getCanonicalHostName();
+    LOG.info("Searching service instance by canonical hostname " + name);
+    serviceInstance = selectServiceInstance(instanceSet.getByHost(name));
+    if (serviceInstance != null) {
+      return serviceInstance;
+    }
+
+    name = address.getHostAddress();
+    LOG.info("Searching service instance by address " + name);
+    serviceInstance = selectServiceInstance(instanceSet.getByHost(name));
+    if (serviceInstance != null) {
+      return serviceInstance;
+    }
+
+    return serviceInstance;
+  }
+
+  private ServiceInstance selectServiceInstance(Set<ServiceInstance> serviceInstances) {
+    if (serviceInstances == null || serviceInstances.isEmpty()) {
+      return null;
+    }
+
+    // Get the first live service instance
+    for (ServiceInstance serviceInstance : serviceInstances) {
+      if (serviceInstance.isAlive()) {
+        return serviceInstance;
+      }
+    }
+
+    LOG.info("No live service instances were found");
+    return null;
+  }
+
   private SubmitWorkRequestProto constructSubmitWorkRequestProto(SubmitWorkInfo submitWorkInfo,
       int taskNum,
       InetSocketAddress address,