You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2015/12/10 01:03:42 UTC
[1/2] hive git commit: HIVE-12341 : LLAP: add security to daemon
protocol endpoint (excluding shuffle) (Sergey Shelukhin,
reviewed by Siddharth Seth, Lefty Leverenz)
Repository: hive
Updated Branches:
refs/heads/master 74e5c75e4 -> 915587b8c
http://git-wip-us.apache.org/repos/asf/hive/blob/915587b8/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java
index 784c631..db0b752 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java
@@ -16,17 +16,26 @@ package org.apache.hadoop.hive.llap.daemon.impl;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.security.PrivilegedAction;
import java.util.concurrent.atomic.AtomicReference;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.io.ByteArrayDataOutput;
+import com.google.common.io.ByteStreams;
import com.google.protobuf.BlockingService;
+import com.google.protobuf.ByteString;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteResponseProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto;
@@ -35,40 +44,52 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWor
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentResponseProto;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.hive.llap.security.LlapSecurityHelper;
+import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
+import org.apache.hadoop.hive.llap.security.SecretManager;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.hive.llap.daemon.ContainerRunner;
import org.apache.hadoop.hive.llap.daemon.LlapDaemonProtocolBlockingPB;
+import org.apache.hadoop.hive.llap.daemon.LlapManagementProtocolBlockingPB;
+import org.apache.hadoop.hive.llap.security.LlapDaemonPolicyProvider;
public class LlapDaemonProtocolServerImpl extends AbstractService
- implements LlapDaemonProtocolBlockingPB {
+ implements LlapDaemonProtocolBlockingPB, LlapManagementProtocolBlockingPB {
private static final Logger LOG = LoggerFactory.getLogger(LlapDaemonProtocolServerImpl.class);
private final int numHandlers;
private final ContainerRunner containerRunner;
- private final int configuredPort;
- private RPC.Server server;
- private final AtomicReference<InetSocketAddress> bindAddress;
-
+ private final int srvPort, mngPort;
+ private RPC.Server server, mngServer;
+ private final AtomicReference<InetSocketAddress> srvAddress, mngAddress;
+ private SecretManager zkSecretManager;
public LlapDaemonProtocolServerImpl(int numHandlers,
ContainerRunner containerRunner,
- AtomicReference<InetSocketAddress> address,
- int configuredPort) {
+ AtomicReference<InetSocketAddress> srvAddress,
+ AtomicReference<InetSocketAddress> mngAddress,
+ int srvPort,
+ int mngPort) {
super("LlapDaemonProtocolServerImpl");
this.numHandlers = numHandlers;
this.containerRunner = containerRunner;
- this.bindAddress = address;
- this.configuredPort = configuredPort;
+ this.srvAddress = srvAddress;
+ this.srvPort = srvPort;
+ this.mngAddress = mngAddress;
+ this.mngPort = mngPort;
LOG.info("Creating: " + LlapDaemonProtocolServerImpl.class.getSimpleName() +
- " with port configured to: " + configuredPort);
+ " with port configured to: " + srvPort);
}
@Override
- public SubmitWorkResponseProto submitWork (RpcController controller,
+ public SubmitWorkResponseProto submitWork(RpcController controller,
SubmitWorkRequestProto request) throws
ServiceException {
try {
@@ -81,16 +102,14 @@ public class LlapDaemonProtocolServerImpl extends AbstractService
@Override
public SourceStateUpdatedResponseProto sourceStateUpdated(RpcController controller,
- SourceStateUpdatedRequestProto request) throws
- ServiceException {
+ SourceStateUpdatedRequestProto request) throws ServiceException {
containerRunner.sourceStateUpdated(request);
return SourceStateUpdatedResponseProto.getDefaultInstance();
}
@Override
public QueryCompleteResponseProto queryComplete(RpcController controller,
- QueryCompleteRequestProto request) throws
- ServiceException {
+ QueryCompleteRequestProto request) throws ServiceException {
containerRunner.queryComplete(request);
return QueryCompleteResponseProto.getDefaultInstance();
}
@@ -105,24 +124,62 @@ public class LlapDaemonProtocolServerImpl extends AbstractService
@Override
public void serviceStart() {
- Configuration conf = getConfig();
+ final Configuration conf = getConfig();
+ final BlockingService daemonImpl =
+ LlapDaemonProtocolProtos.LlapDaemonProtocol.newReflectiveBlockingService(this);
+ final BlockingService managementImpl =
+ LlapDaemonProtocolProtos.LlapManagementProtocol.newReflectiveBlockingService(this);
+ if (!UserGroupInformation.isSecurityEnabled()) {
+ startProtocolServers(conf, daemonImpl, managementImpl);
+ return;
+ }
+ String llapPrincipal = HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_PRINCIPAL),
+ llapKeytab = HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_KEYTAB_FILE);
+ zkSecretManager = SecretManager.createSecretManager(conf, llapPrincipal, llapKeytab);
- InetSocketAddress addr = new InetSocketAddress(configuredPort);
+ // Start the protocol server after properly authenticating with daemon keytab.
+ UserGroupInformation daemonUgi = null;
try {
- server = createServer(LlapDaemonProtocolBlockingPB.class, addr, conf, numHandlers,
- LlapDaemonProtocolProtos.LlapDaemonProtocol.newReflectiveBlockingService(this));
+ daemonUgi = LlapSecurityHelper.loginWithKerberos(llapPrincipal, llapKeytab);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ daemonUgi.doAs(new PrivilegedAction<Void>() {
+ @Override
+ public Void run() {
+ startProtocolServers(conf, daemonImpl, managementImpl);
+ return null;
+ }
+ });
+ }
+
+ private void startProtocolServers(
+ Configuration conf, BlockingService daemonImpl, BlockingService managementImpl) {
+ server = startProtocolServer(srvPort, numHandlers, srvAddress, conf, daemonImpl,
+ LlapDaemonProtocolBlockingPB.class);
+ mngServer = startProtocolServer(mngPort, 2, mngAddress, conf, managementImpl,
+ LlapManagementProtocolBlockingPB.class);
+ }
+
+ private RPC.Server startProtocolServer(int srvPort, int numHandlers,
+ AtomicReference<InetSocketAddress> bindAddress, Configuration conf,
+ BlockingService impl, Class<?> protocolClass) {
+ InetSocketAddress addr = new InetSocketAddress(srvPort);
+ RPC.Server server;
+ try {
+ server = createServer(protocolClass, addr, conf, numHandlers, impl);
server.start();
} catch (IOException e) {
- LOG.error("Failed to run RPC Server on port: " + configuredPort, e);
+ LOG.error("Failed to run RPC Server on port: " + srvPort, e);
throw new RuntimeException(e);
}
InetSocketAddress serverBindAddress = NetUtils.getConnectAddress(server);
- this.bindAddress.set(NetUtils.createSocketAddrForHost(
+ bindAddress.set(NetUtils.createSocketAddrForHost(
serverBindAddress.getAddress().getCanonicalHostName(),
serverBindAddress.getPort()));
- LOG.info("Instantiated " + LlapDaemonProtocolBlockingPB.class.getSimpleName() + " at " +
- bindAddress);
+ LOG.info("Instantiated " + protocolClass.getSimpleName() + " at " + bindAddress);
+ return server;
}
@Override
@@ -130,26 +187,68 @@ public class LlapDaemonProtocolServerImpl extends AbstractService
if (server != null) {
server.stop();
}
+ if (mngServer != null) {
+ mngServer.stop();
+ }
}
@InterfaceAudience.Private
@VisibleForTesting
InetSocketAddress getBindAddress() {
- return bindAddress.get();
+ return srvAddress.get();
}
private RPC.Server createServer(Class<?> pbProtocol, InetSocketAddress addr, Configuration conf,
int numHandlers, BlockingService blockingService) throws
IOException {
RPC.setProtocolEngine(conf, pbProtocol, ProtobufRpcEngine.class);
- RPC.Server server = new RPC.Builder(conf)
+ RPC.Builder builder = new RPC.Builder(conf)
.setProtocol(pbProtocol)
.setInstance(blockingService)
.setBindAddress(addr.getHostName())
.setPort(addr.getPort())
- .setNumHandlers(numHandlers)
- .build();
- // TODO Add security.
+ .setNumHandlers(numHandlers);
+ if (zkSecretManager != null) {
+ builder = builder.setSecretManager(zkSecretManager);
+ }
+ RPC.Server server = builder.build();
+
+ if (conf.getBoolean(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, false)) {
+ server.refreshServiceAcl(conf, new LlapDaemonPolicyProvider());
+ }
return server;
}
+
+ @Override
+ public GetTokenResponseProto getDelegationToken(RpcController controller,
+ GetTokenRequestProto request) throws ServiceException {
+ if (zkSecretManager == null) {
+ throw new ServiceException("Operation not supported on unsecure cluster");
+ }
+ UserGroupInformation ugi;
+ try {
+ ugi = UserGroupInformation.getCurrentUser();
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ String user = ugi.getUserName();
+ Text owner = new Text(user);
+ Text realUser = null;
+ if (ugi.getRealUser() != null) {
+ realUser = new Text(ugi.getRealUser().getUserName());
+ }
+ Text renewer = new Text(ugi.getShortUserName());
+ LlapTokenIdentifier llapId = new LlapTokenIdentifier(owner, renewer, realUser);
+ // TODO: note that the token is not renewable right now and will last for 2 weeks by default.
+ Token<LlapTokenIdentifier> token = new Token<LlapTokenIdentifier>(llapId, zkSecretManager);
+ ByteArrayDataOutput out = ByteStreams.newDataOutput();
+ try {
+ token.write(out);
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ ByteString bs = ByteString.copyFrom(out.toByteArray());
+ GetTokenResponseProto response = GetTokenResponseProto.newBuilder().setToken(bs).build();
+ return response;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/915587b8/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapManagementProtocolClientImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapManagementProtocolClientImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapManagementProtocolClientImpl.java
new file mode 100644
index 0000000..e293a95
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapManagementProtocolClientImpl.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.daemon.impl;
+
+import javax.annotation.Nullable;
+import javax.net.SocketFactory;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.ProtocolProxy;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.hive.llap.daemon.LlapManagementProtocolBlockingPB;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto;
+import org.apache.hadoop.security.UserGroupInformation;
+
+public class LlapManagementProtocolClientImpl implements LlapManagementProtocolBlockingPB {
+
+ private final Configuration conf;
+ private final InetSocketAddress serverAddr;
+ private final RetryPolicy retryPolicy;
+ private final SocketFactory socketFactory;
+ LlapManagementProtocolBlockingPB proxy;
+
+
+ public LlapManagementProtocolClientImpl(Configuration conf, String hostname, int port,
+ @Nullable RetryPolicy retryPolicy,
+ @Nullable SocketFactory socketFactory) {
+ this.conf = conf;
+ this.serverAddr = NetUtils.createSocketAddr(hostname, port);
+ this.retryPolicy = retryPolicy;
+ if (socketFactory == null) {
+ this.socketFactory = NetUtils.getDefaultSocketFactory(conf);
+ } else {
+ this.socketFactory = socketFactory;
+ }
+ }
+
+ public LlapManagementProtocolBlockingPB getProxy() throws IOException {
+ if (proxy == null) {
+ proxy = createProxy();
+ }
+ return proxy;
+ }
+
+ public LlapManagementProtocolBlockingPB createProxy() throws IOException {
+ RPC.setProtocolEngine(conf, LlapManagementProtocolBlockingPB.class, ProtobufRpcEngine.class);
+ ProtocolProxy<LlapManagementProtocolBlockingPB> proxy =
+ RPC.getProtocolProxy(LlapManagementProtocolBlockingPB.class, 0, serverAddr,
+ UserGroupInformation.getCurrentUser(), conf, NetUtils.getDefaultSocketFactory(conf), 0,
+ retryPolicy);
+ return proxy.getProxy();
+ }
+
+ @Override
+ public GetTokenResponseProto getDelegationToken(RpcController controller,
+ GetTokenRequestProto request) throws ServiceException {
+ try {
+ return getProxy().getDelegationToken(null, request);
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/915587b8/llap-server/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java b/llap-server/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java
index fae7654..9549567 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java
@@ -18,11 +18,14 @@ import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.VersionedProtocol;
+import org.apache.hadoop.security.token.TokenInfo;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
+import org.apache.tez.runtime.common.security.JobTokenSelector;
+@TokenInfo(JobTokenSelector.class)
public interface LlapTaskUmbilicalProtocol extends VersionedProtocol {
public static final long versionID = 1L;
http://git-wip-us.apache.org/repos/asf/hive/blob/915587b8/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapDaemonPolicyProvider.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapDaemonPolicyProvider.java b/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapDaemonPolicyProvider.java
new file mode 100644
index 0000000..d67647b
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapDaemonPolicyProvider.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.security;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.daemon.LlapDaemonProtocolBlockingPB;
+import org.apache.hadoop.hive.llap.daemon.LlapManagementProtocolBlockingPB;
+import org.apache.hadoop.security.authorize.PolicyProvider;
+import org.apache.hadoop.security.authorize.Service;
+
+public class LlapDaemonPolicyProvider extends PolicyProvider {
+ private static final Service[] services = new Service[] {
+ new Service(HiveConf.ConfVars.LLAP_SECURITY_ACL.varname,
+ LlapDaemonProtocolBlockingPB.class),
+ new Service(HiveConf.ConfVars.LLAP_MANAGEMENT_ACL.varname,
+ LlapManagementProtocolBlockingPB.class)
+ };
+
+ @Override
+ public Service[] getServices() {
+ return services;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/915587b8/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSecurityHelper.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSecurityHelper.java b/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSecurityHelper.java
new file mode 100644
index 0000000..a00b631
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSecurityHelper.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.security;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import javax.net.SocketFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.llap.daemon.impl.LlapManagementProtocolClientImpl;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto;
+import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
+import org.apache.hadoop.hive.llap.registry.ServiceInstance;
+import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet;
+import org.apache.hadoop.io.DataInputByteBuffer;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.protobuf.ByteString;
+
+/** Individual instances of this class are not thread safe. */
+public class LlapSecurityHelper implements LlapTokenProvider {
+ private static final Logger LOG = LoggerFactory.getLogger(LlapSecurityHelper.class);
+
+ private UserGroupInformation llapUgi;
+
+ private final LlapRegistryService registry;
+ private ServiceInstanceSet activeInstances;
+ private final Configuration conf;
+ private LlapManagementProtocolClientImpl client;
+
+ private final SocketFactory socketFactory;
+ private final RetryPolicy retryPolicy;
+
+ public LlapSecurityHelper(Configuration conf) {
+ this.conf = conf;
+ registry = new LlapRegistryService(false);
+ registry.init(conf);
+ socketFactory = NetUtils.getDefaultSocketFactory(conf);
+ retryPolicy = RetryPolicies.retryUpToMaximumTimeWithFixedSleep(
+ 16000, 2000l, TimeUnit.MILLISECONDS);
+ }
+
+ public static UserGroupInformation loginWithKerberos(
+ String principal, String keytabFile) throws IOException {
+ if (!UserGroupInformation.isSecurityEnabled()) return null;
+ if (principal.isEmpty() || keytabFile.isEmpty()) {
+ throw new RuntimeException("Kerberos principal and/or keytab are empty");
+ }
+ LOG.info("Logging in as " + principal + " via " + keytabFile);
+ UserGroupInformation.loginUserFromKeytab(
+ SecurityUtil.getServerPrincipal(principal, "0.0.0.0"), keytabFile);
+ return UserGroupInformation.getLoginUser();
+ }
+
+ @Override
+ public Token<LlapTokenIdentifier> getDelegationToken() throws IOException {
+ if (!UserGroupInformation.isSecurityEnabled()) return null;
+ if (llapUgi == null) {
+ llapUgi = UserGroupInformation.getCurrentUser();
+ // We could have also added keytab support; right now client must do smth like kinit.
+ }
+ Iterator<ServiceInstance> llaps = null;
+ ServiceInstance someLlap = null;
+ if (client == null) {
+ llaps = getLlapServices(false);
+ someLlap = llaps.next();
+ }
+
+ ByteString tokenBytes = null;
+ boolean hasRefreshed = false;
+ while (true) {
+ try {
+ tokenBytes = getTokenBytes(someLlap);
+ break;
+ } catch (InterruptedException ie) {
+ throw new RuntimeException(ie);
+ } catch (IOException ex) {
+ LOG.error("Cannot get a token, trying a different instance", ex);
+ client = null;
+ }
+ if (llaps == null || !llaps.hasNext()) {
+ if (hasRefreshed) { // Only refresh once.
+ throw new RuntimeException("Cannot find any LLAPs to get the token from");
+ }
+ llaps = getLlapServices(true);
+ hasRefreshed = true;
+ }
+ someLlap = llaps.next();
+ }
+
+ // Stupid protobuf byte-buffer reinvention.
+ Token<LlapTokenIdentifier> token = new Token<>();
+ DataInputByteBuffer in = new DataInputByteBuffer();
+ in.reset(tokenBytes.asReadOnlyByteBuffer());
+ token.readFields(in);
+ return token;
+ }
+
+ private ByteString getTokenBytes(
+ final ServiceInstance si) throws InterruptedException, IOException {
+ return llapUgi.doAs(new PrivilegedExceptionAction<ByteString>() {
+ @Override
+ public ByteString run() throws Exception {
+ if (client == null) {
+ client = new LlapManagementProtocolClientImpl(
+ conf, si.getHost(), si.getManagementPort(), retryPolicy, socketFactory);
+ }
+ // Client only connects on the first call, so this has to be done in doAs.
+ GetTokenRequestProto req = GetTokenRequestProto.newBuilder().build();
+ return client.getDelegationToken(null, req).getToken();
+ }
+ });
+ }
+
+ private Iterator<ServiceInstance> getLlapServices(boolean doForceRefresh) throws IOException {
+ if (activeInstances == null) {
+ registry.start();
+ activeInstances = registry.getInstances();
+ }
+ Map<String, ServiceInstance> daemons = activeInstances.getAll();
+ if (doForceRefresh || daemons == null || daemons.isEmpty()) {
+ activeInstances.refresh();
+ daemons = activeInstances.getAll();
+ if (daemons == null || daemons.isEmpty()) throw new RuntimeException("No LLAPs found");
+ }
+ return daemons.values().iterator();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/915587b8/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapServerSecurityInfo.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapServerSecurityInfo.java b/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapServerSecurityInfo.java
new file mode 100644
index 0000000..4dca2ce
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapServerSecurityInfo.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.security;
+
+import java.lang.annotation.Annotation;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.daemon.LlapDaemonProtocolBlockingPB;
+import org.apache.hadoop.hive.llap.daemon.LlapManagementProtocolBlockingPB;
+import org.apache.hadoop.security.KerberosInfo;
+import org.apache.hadoop.security.SecurityInfo;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.TokenInfo;
+import org.apache.hadoop.security.token.TokenSelector;
+
+public class LlapServerSecurityInfo extends SecurityInfo {
+ private static final Log LOG = LogFactory.getLog(LlapServerSecurityInfo.class);
+
+ @Override
+ public KerberosInfo getKerberosInfo(Class<?> protocol, Configuration conf) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Trying to get KerberosInfo for " + protocol);
+ }
+ if (!LlapDaemonProtocolBlockingPB.class.isAssignableFrom(protocol)
+ && !LlapManagementProtocolBlockingPB.class.isAssignableFrom(protocol)) return null;
+ return new KerberosInfo() {
+ @Override
+ public Class<? extends Annotation> annotationType() {
+ return null;
+ }
+
+ @Override
+ public String serverPrincipal() {
+ return HiveConf.ConfVars.LLAP_KERBEROS_PRINCIPAL.varname;
+ }
+
+ @Override
+ public String clientPrincipal() {
+ return null;
+ }
+ };
+ }
+
+ @Override
+ public TokenInfo getTokenInfo(Class<?> protocol, Configuration conf) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Trying to get TokenInfo for " + protocol);
+ }
+ // Tokens cannot be used for the management protocol (for now).
+ if (!LlapDaemonProtocolBlockingPB.class.isAssignableFrom(protocol)) return null;
+ return new TokenInfo() {
+ @Override
+ public Class<? extends Annotation> annotationType() {
+ return null;
+ }
+
+ @Override
+ public Class<? extends TokenSelector<? extends TokenIdentifier>> value() {
+ return LlapTokenSelector.class;
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/915587b8/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapTokenSelector.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapTokenSelector.java b/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapTokenSelector.java
new file mode 100644
index 0000000..b6e7499
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapTokenSelector.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.security;
+
+import java.util.Collection;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.TokenSelector;
+
+public class LlapTokenSelector implements TokenSelector<LlapTokenIdentifier> {
+ private static final Log LOG = LogFactory.getLog(LlapTokenSelector.class);
+
+ @Override
+ public Token<LlapTokenIdentifier> selectToken(Text service,
+ Collection<Token<? extends TokenIdentifier>> tokens) {
+ if (service == null) return null;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Looking for a token with service " + service);
+ }
+ for (Token<? extends TokenIdentifier> token : tokens) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Token = " + token.getKind() + "; service = " + token.getService());
+ }
+ if (LlapTokenIdentifier.KIND_NAME.equals(token.getKind())
+ && service.equals(token.getService())) {
+ @SuppressWarnings("unchecked")
+ Token<LlapTokenIdentifier> result = (Token<LlapTokenIdentifier>)token;
+ return result;
+ }
+ }
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/915587b8/llap-server/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java b/llap-server/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java
new file mode 100644
index 0000000..dc4e81a
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.security;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.security.PrivilegedAction;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.delegation.ZKDelegationTokenSecretManager;
+import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager;
+
+public class SecretManager extends ZKDelegationTokenSecretManager<LlapTokenIdentifier> {
+ public SecretManager(Configuration conf) {
+ super(conf);
+ }
+
+ @Override
+ public LlapTokenIdentifier createIdentifier() {
+ return new LlapTokenIdentifier();
+ }
+
+ @Override
+ public LlapTokenIdentifier decodeTokenIdentifier(
+ Token<LlapTokenIdentifier> token) throws IOException {
+ DataInputStream dis = new DataInputStream(new ByteArrayInputStream(token.getIdentifier()));
+ LlapTokenIdentifier id = new LlapTokenIdentifier();
+ id.readFields(dis);
+ dis.close();
+ return id;
+ }
+
+ public static SecretManager createSecretManager(
+ final Configuration conf, String llapPrincipal, String llapKeytab) {
+ // Create ZK connection under a separate ugi (if specified) - ZK works in mysterious ways.
+ UserGroupInformation zkUgi = null;
+ String principal = HiveConf.getVar(conf, ConfVars.LLAP_ZKSM_KERBEROS_PRINCIPAL, llapPrincipal);
+ String keyTab = HiveConf.getVar(conf, ConfVars.LLAP_ZKSM_KERBEROS_KEYTAB_FILE, llapKeytab);
+ try {
+ zkUgi = LlapSecurityHelper.loginWithKerberos(principal, keyTab);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ // Override the default delegation token lifetime for LLAP.
+ // Also set all the necessary ZK settings to defaults and LLAP configs, if not set.
+ final Configuration zkConf = new Configuration(conf);
+ zkConf.setLong(DelegationTokenManager.MAX_LIFETIME,
+ HiveConf.getTimeVar(conf, ConfVars.LLAP_DELEGATION_TOKEN_LIFETIME, TimeUnit.SECONDS));
+ zkConf.set(SecretManager.ZK_DTSM_ZK_KERBEROS_PRINCIPAL, principal);
+ zkConf.set(SecretManager.ZK_DTSM_ZK_KERBEROS_KEYTAB, keyTab);
+ setZkConfIfNotSet(zkConf, SecretManager.ZK_DTSM_ZNODE_WORKING_PATH, "llapzkdtsm");
+ setZkConfIfNotSet(zkConf, SecretManager.ZK_DTSM_ZK_AUTH_TYPE, "sasl");
+ setZkConfIfNotSet(zkConf, SecretManager.ZK_DTSM_ZK_CONNECTION_STRING,
+ HiveConf.getVar(zkConf, ConfVars.LLAP_ZKSM_ZK_CONNECTION_STRING));
+ return zkUgi.doAs(new PrivilegedAction<SecretManager>() {
+ @Override
+ public SecretManager run() {
+ SecretManager zkSecretManager = new SecretManager(zkConf);
+ try {
+ zkSecretManager.startThreads();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return zkSecretManager;
+ }
+ });
+ }
+
+ private static void setZkConfIfNotSet(Configuration zkConf, String name, String value) {
+ if (zkConf.get(name) != null) return;
+ zkConf.set(name, value);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/915587b8/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
index b93650d..ce248e9 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
@@ -25,11 +25,13 @@ import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import com.google.common.base.Preconditions;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.protobuf.ByteString;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.LlapNodeId;
@@ -44,6 +46,7 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWor
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentResponseProto;
import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol;
+import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
import org.apache.hadoop.hive.llap.tezplugins.helpers.SourceStateTracker;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
@@ -52,6 +55,8 @@ import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LocalResource;
@@ -88,12 +93,23 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
private TaskCommunicator communicator;
private long deleteDelayOnDagComplete;
private final LlapTaskUmbilicalProtocol umbilical;
+ private final Token<LlapTokenIdentifier> token;
private volatile String currentDagName;
public LlapTaskCommunicator(
TaskCommunicatorContext taskCommunicatorContext) {
super(taskCommunicatorContext);
+ Credentials credentials = taskCommunicatorContext.getCredentials();
+ if (credentials != null) {
+ @SuppressWarnings("unchecked")
+ Token<LlapTokenIdentifier> llapToken =
+ (Token<LlapTokenIdentifier>)credentials.getToken(LlapTokenIdentifier.KIND_NAME);
+ this.token = llapToken;
+ } else {
+ this.token = null;
+ }
+ Preconditions.checkState((token != null) == UserGroupInformation.isSecurityEnabled());
umbilical = new LlapTaskUmbilicalProtocolImpl(getUmbilical());
SubmitWorkRequestProto.Builder baseBuilder = SubmitWorkRequestProto.newBuilder();
@@ -117,7 +133,7 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
super.initialize();
Configuration conf = getConf();
int numThreads = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_COMMUNICATOR_NUM_THREADS);
- this.communicator = new TaskCommunicator(numThreads, conf);
+ this.communicator = new TaskCommunicator(numThreads, conf, token);
this.deleteDelayOnDagComplete = HiveConf.getTimeVar(
conf, ConfVars.LLAP_FILE_CLEANUP_DELAY_SECONDS, TimeUnit.SECONDS);
LOG.info("Running LlapTaskCommunicator with "
@@ -158,7 +174,9 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
.setNumHandlers(numHandlers)
.setSecretManager(jobTokenSecretManager).build();
- // Do serviceACLs need to be refreshed, like in Tez ?
+ if (conf.getBoolean(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, false)) {
+ server.refreshServiceAcl(conf, new LlapUmbilicalPolicyProvider());
+ }
server.start();
this.address = NetUtils.getConnectAddress(server);
http://git-wip-us.apache.org/repos/asf/hive/blob/915587b8/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapUmbilicalPolicyProvider.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapUmbilicalPolicyProvider.java b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapUmbilicalPolicyProvider.java
new file mode 100644
index 0000000..4102f5b
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapUmbilicalPolicyProvider.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.tezplugins;
+
+import java.util.Arrays;
+
+import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol;
+import org.apache.hadoop.security.authorize.Service;
+import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.dag.app.security.authorize.TezAMPolicyProvider;
+
+public class LlapUmbilicalPolicyProvider extends TezAMPolicyProvider {
+
+ private static Service[] services;
+ private static final Object servicesLock = new Object();
+
+ @Override
+ public Service[] getServices() {
+ if (services != null) return services;
+ synchronized (servicesLock) {
+ if (services != null) return services;
+ Service[] parentSvc = super.getServices();
+ Service[] result = Arrays.copyOf(parentSvc, parentSvc.length + 1);
+ result[parentSvc.length] = new Service(
+ TezConstants.TEZ_AM_SECURITY_SERVICE_AUTHORIZATION_TASK_UMBILICAL,
+ LlapTaskUmbilicalProtocol.class);
+ return (services = result);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/915587b8/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java
index 8144165..f9ca677 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java
@@ -15,6 +15,11 @@
package org.apache.hadoop.hive.llap.tezplugins;
import javax.net.SocketFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.security.PrivilegedAction;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
@@ -55,9 +60,13 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWor
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentResponseProto;
+import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.service.AbstractService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -74,12 +83,14 @@ public class TaskCommunicator extends AbstractService {
private final ListeningExecutorService requestManagerExecutor;
private volatile ListenableFuture<Void> requestManagerFuture;
+ private final Token<LlapTokenIdentifier> llapToken;
-
- public TaskCommunicator(int numThreads, Configuration conf) {
+ public TaskCommunicator(
+ int numThreads, Configuration conf, Token<LlapTokenIdentifier> llapToken) {
super(TaskCommunicator.class.getSimpleName());
this.hostProxies = new ConcurrentHashMap<>();
this.socketFactory = NetUtils.getDefaultSocketFactory(conf);
+ this.llapToken = llapToken;
long connectionTimeout = HiveConf.getTimeVar(conf,
ConfVars.LLAP_TASK_COMMUNICATOR_CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS);
@@ -458,13 +469,34 @@ public class TaskCommunicator extends AbstractService {
void indicateError(Throwable t);
}
- private LlapDaemonProtocolBlockingPB getProxy(LlapNodeId nodeId) {
+ private LlapDaemonProtocolBlockingPB getProxy(final LlapNodeId nodeId) {
String hostId = getHostIdentifier(nodeId.getHostname(), nodeId.getPort());
LlapDaemonProtocolBlockingPB proxy = hostProxies.get(hostId);
if (proxy == null) {
- proxy = new LlapDaemonProtocolClientImpl(getConfig(), nodeId.getHostname(), nodeId.getPort(),
- retryPolicy, socketFactory);
+ if (llapToken == null) {
+ proxy = new LlapDaemonProtocolClientImpl(getConfig(), nodeId.getHostname(),
+ nodeId.getPort(), retryPolicy, socketFactory);
+ } else {
+ UserGroupInformation ugi;
+ try {
+ ugi = UserGroupInformation.getCurrentUser();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ Token<LlapTokenIdentifier> nodeToken = new Token<LlapTokenIdentifier>(llapToken);
+ SecurityUtil.setTokenService(nodeToken, NetUtils.createSocketAddrForHost(
+ nodeId.getHostname(), nodeId.getPort()));
+ ugi.addToken(nodeToken);
+ proxy = ugi.doAs(new PrivilegedAction<LlapDaemonProtocolBlockingPB>() {
+ @Override
+ public LlapDaemonProtocolBlockingPB run() {
+ return new LlapDaemonProtocolClientImpl(getConfig(), nodeId.getHostname(),
+ nodeId.getPort(), retryPolicy, socketFactory);
+ }
+ });
+ }
+
LlapDaemonProtocolBlockingPB proxyOld = hostProxies.putIfAbsent(hostId, proxy);
if (proxyOld != null) {
// TODO Shutdown the new proxy.
http://git-wip-us.apache.org/repos/asf/hive/blob/915587b8/llap-server/src/main/resources/META-INF/services/org.apache.hadoop.security.SecurityInfo
----------------------------------------------------------------------
diff --git a/llap-server/src/main/resources/META-INF/services/org.apache.hadoop.security.SecurityInfo b/llap-server/src/main/resources/META-INF/services/org.apache.hadoop.security.SecurityInfo
new file mode 100644
index 0000000..dcc6988
--- /dev/null
+++ b/llap-server/src/main/resources/META-INF/services/org.apache.hadoop.security.SecurityInfo
@@ -0,0 +1,14 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+org.apache.hadoop.hive.llap.security.LlapServerSecurityInfo
http://git-wip-us.apache.org/repos/asf/hive/blob/915587b8/llap-server/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer
----------------------------------------------------------------------
diff --git a/llap-server/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer b/llap-server/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer
new file mode 100644
index 0000000..e80ac41
--- /dev/null
+++ b/llap-server/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer
@@ -0,0 +1,14 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+org.apache.hadoop.hive.llap.security.LlapTokenIdentifier.Renewer
http://git-wip-us.apache.org/repos/asf/hive/blob/915587b8/llap-server/src/protobuf/LlapDaemonProtocol.proto
----------------------------------------------------------------------
diff --git a/llap-server/src/protobuf/LlapDaemonProtocol.proto b/llap-server/src/protobuf/LlapDaemonProtocol.proto
index 0ba6acf..07721df 100644
--- a/llap-server/src/protobuf/LlapDaemonProtocol.proto
+++ b/llap-server/src/protobuf/LlapDaemonProtocol.proto
@@ -117,9 +117,20 @@ message TerminateFragmentRequestProto {
message TerminateFragmentResponseProto {
}
+message GetTokenRequestProto {
+}
+
+message GetTokenResponseProto {
+ optional bytes token = 1;
+}
+
service LlapDaemonProtocol {
rpc submitWork(SubmitWorkRequestProto) returns (SubmitWorkResponseProto);
rpc sourceStateUpdated(SourceStateUpdatedRequestProto) returns (SourceStateUpdatedResponseProto);
rpc queryComplete(QueryCompleteRequestProto) returns (QueryCompleteResponseProto);
rpc terminateFragment(TerminateFragmentRequestProto) returns (TerminateFragmentResponseProto);
}
+
+service LlapManagementProtocol {
+ rpc getDelegationToken(GetTokenRequestProto) returns (GetTokenResponseProto);
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/915587b8/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java
index 52ba360..deade5f 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java
@@ -143,7 +143,7 @@ public class MiniLlapCluster extends AbstractService {
@Override
public void serviceInit(Configuration conf) {
llapDaemon = new LlapDaemon(conf, numExecutorsPerService, execBytesPerService, llapIoEnabled,
- ioIsDirect, ioBytesPerService, localDirs, 0, 0);
+ ioIsDirect, ioBytesPerService, localDirs, 0, 0, 0);
llapDaemon.init(conf);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/915587b8/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java
index bf8a673..0006a9a 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java
@@ -39,7 +39,8 @@ public class TestLlapDaemonProtocolServerImpl {
int numHandlers = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_DAEMON_RPC_NUM_HANDLERS);
LlapDaemonProtocolServerImpl server =
new LlapDaemonProtocolServerImpl(numHandlers, mock(ContainerRunner.class),
- new AtomicReference<InetSocketAddress>(), rpcPort);
+ new AtomicReference<InetSocketAddress>(), new AtomicReference<InetSocketAddress>(),
+ rpcPort, rpcPort + 1);
try {
server.init(new Configuration());
http://git-wip-us.apache.org/repos/asf/hive/blob/915587b8/ql/src/gen/protobuf/gen-java/org/apache/hadoop/hive/ql/io/orc/OrcProto.java
----------------------------------------------------------------------
diff --git a/ql/src/gen/protobuf/gen-java/org/apache/hadoop/hive/ql/io/orc/OrcProto.java b/ql/src/gen/protobuf/gen-java/org/apache/hadoop/hive/ql/io/orc/OrcProto.java
index f4cc240..5202476 100644
--- a/ql/src/gen/protobuf/gen-java/org/apache/hadoop/hive/ql/io/orc/OrcProto.java
+++ b/ql/src/gen/protobuf/gen-java/org/apache/hadoop/hive/ql/io/orc/OrcProto.java
@@ -17765,6 +17765,7 @@ public final class OrcProto {
* Version of the writer:
* 0 (or missing) = original
* 1 = HIVE-8732 fixed
+ * 2 = HIVE-4243 fixed
* </pre>
*/
boolean hasWriterVersion();
@@ -17775,6 +17776,7 @@ public final class OrcProto {
* Version of the writer:
* 0 (or missing) = original
* 1 = HIVE-8732 fixed
+ * 2 = HIVE-4243 fixed
* </pre>
*/
int getWriterVersion();
@@ -18077,6 +18079,7 @@ public final class OrcProto {
* Version of the writer:
* 0 (or missing) = original
* 1 = HIVE-8732 fixed
+ * 2 = HIVE-4243 fixed
* </pre>
*/
public boolean hasWriterVersion() {
@@ -18089,6 +18092,7 @@ public final class OrcProto {
* Version of the writer:
* 0 (or missing) = original
* 1 = HIVE-8732 fixed
+ * 2 = HIVE-4243 fixed
* </pre>
*/
public int getWriterVersion() {
@@ -18759,6 +18763,7 @@ public final class OrcProto {
* Version of the writer:
* 0 (or missing) = original
* 1 = HIVE-8732 fixed
+ * 2 = HIVE-4243 fixed
* </pre>
*/
public boolean hasWriterVersion() {
@@ -18771,6 +18776,7 @@ public final class OrcProto {
* Version of the writer:
* 0 (or missing) = original
* 1 = HIVE-8732 fixed
+ * 2 = HIVE-4243 fixed
* </pre>
*/
public int getWriterVersion() {
@@ -18783,6 +18789,7 @@ public final class OrcProto {
* Version of the writer:
* 0 (or missing) = original
* 1 = HIVE-8732 fixed
+ * 2 = HIVE-4243 fixed
* </pre>
*/
public Builder setWriterVersion(int value) {
@@ -18798,6 +18805,7 @@ public final class OrcProto {
* Version of the writer:
* 0 (or missing) = original
* 1 = HIVE-8732 fixed
+ * 2 = HIVE-4243 fixed
* </pre>
*/
public Builder clearWriterVersion() {
http://git-wip-us.apache.org/repos/asf/hive/blob/915587b8/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
index 1a9469a..4fb6c00 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
@@ -2993,7 +2993,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
cols.addAll(tbl.getPartCols());
}
} else {
- cols = Hive.getFieldsFromDeserializer(colPath, deserializer); // TODO#: here - desc
+ cols = Hive.getFieldsFromDeserializer(colPath, deserializer);
if (descTbl.isFormatted()) {
// when column name is specified in describe table DDL, colPath will
// will be table_name.column_name
http://git-wip-us.apache.org/repos/asf/hive/blob/915587b8/ql/src/java/org/apache/hadoop/hive/ql/exec/GlobalWorkMapFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/GlobalWorkMapFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/GlobalWorkMapFactory.java
index 7c38dc3..338e495 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/GlobalWorkMapFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/GlobalWorkMapFactory.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.hive.ql.session.SessionState;
-import org.apache.hadoop.hive.llap.io.api.LlapIoProxy;
+import org.apache.hadoop.hive.llap.io.api.LlapProxy;
public class GlobalWorkMapFactory {
@@ -99,7 +99,7 @@ public class GlobalWorkMapFactory {
DummyMap<Path, BaseWork> dummy = new DummyMap<Path, BaseWork>();
public Map<Path, BaseWork> get(Configuration conf) {
- if (LlapIoProxy.isDaemon()
+ if (LlapProxy.isDaemon()
|| (SessionState.get() != null && SessionState.get().isHiveServerQuery())
|| HiveConf.getVar(conf, ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) {
if (threadLocalWorkMap == null) {
http://git-wip-us.apache.org/repos/asf/hive/blob/915587b8/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java
index 3d9771a..5201120 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java
@@ -24,8 +24,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.llap.io.api.LlapIoProxy;
-import org.apache.hadoop.hive.llap.io.api.LlapIoProxy;
+import org.apache.hadoop.hive.llap.io.api.LlapProxy;
+import org.apache.hadoop.hive.llap.io.api.LlapProxy;
import org.apache.hadoop.hive.ql.exec.tez.LlapObjectCache;
/**
@@ -46,7 +46,7 @@ public class ObjectCacheFactory {
*/
public static ObjectCache getCache(Configuration conf, String queryId) {
if (HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
- if (LlapIoProxy.isDaemon()) { // daemon
+ if (LlapProxy.isDaemon()) { // daemon
if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_OBJECT_CACHE_ENABLED)) {
// LLAP object cache, unlike others, does not use globals. Thus, get the existing one.
return getLlapObjectCache(queryId);
http://git-wip-us.apache.org/repos/asf/hive/blob/915587b8/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
index 914b4e7..ee62ab3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
@@ -34,7 +34,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.llap.io.api.LlapIoProxy;
+import org.apache.hadoop.hive.llap.io.api.LlapProxy;
import org.apache.hadoop.hive.ql.exec.DummyStoreOperator;
import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
import org.apache.hadoop.hive.ql.exec.MapOperator;
@@ -96,7 +96,7 @@ public class MapRecordProcessor extends RecordProcessor {
public MapRecordProcessor(final JobConf jconf, final ProcessorContext context) throws Exception {
super(jconf, context);
String queryId = HiveConf.getVar(jconf, HiveConf.ConfVars.HIVEQUERYID);
- if (LlapIoProxy.isDaemon()) { // do not cache plan
+ if (LlapProxy.isDaemon()) { // do not cache plan
cache = new org.apache.hadoop.hive.ql.exec.mr.ObjectCache();
} else {
cache = ObjectCacheFactory.getCache(jconf, queryId);
http://git-wip-us.apache.org/repos/asf/hive/blob/915587b8/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
index efcf88c..0579dbc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
@@ -29,7 +29,7 @@ import java.util.concurrent.Callable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.llap.io.api.LlapIoProxy;
+import org.apache.hadoop.hive.llap.io.api.LlapProxy;
import org.apache.hadoop.hive.ql.exec.DummyStoreOperator;
import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
import org.apache.hadoop.hive.ql.exec.MapredContext;
@@ -92,7 +92,7 @@ public class ReduceRecordProcessor extends RecordProcessor{
ObjectCache cache;
String queryId = HiveConf.getVar(jconf, HiveConf.ConfVars.HIVEQUERYID);
- if (LlapIoProxy.isDaemon()) { // don't cache plan
+ if (LlapProxy.isDaemon()) { // don't cache plan
cache = new org.apache.hadoop.hive.ql.exec.mr.ObjectCache();
} else {
cache = ObjectCacheFactory.getCache(jconf, queryId);
http://git-wip-us.apache.org/repos/asf/hive/blob/915587b8/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
index 07f26be..e1a8041 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.hive.ql.exec.tez;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
@@ -40,6 +42,7 @@ import java.util.concurrent.TimeoutException;
import javax.security.auth.login.LoginException;
+import org.apache.commons.codec.binary.Base64;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.io.FilenameUtils;
import org.apache.hadoop.fs.FileStatus;
@@ -48,11 +51,16 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.io.api.LlapProxy;
+import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
+import org.apache.hadoop.hive.llap.security.LlapTokenProvider;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
import org.apache.hadoop.hive.shims.Utils;
+import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.tez.client.TezClient;
@@ -189,7 +197,8 @@ public class TezSessionState {
this.queueName = conf.get("tez.queue.name");
this.doAsEnabled = conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS);
- final boolean llapMode = "llap".equals(HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_MODE));
+ final boolean llapMode = "llap".equals(HiveConf.getVar(
+ conf, HiveConf.ConfVars.HIVE_EXECUTION_MODE));
UserGroupInformation ugi = Utils.getUGI();
user = ugi.getShortUserName();
@@ -258,19 +267,25 @@ public class TezSessionState {
conf.stripHiddenConfigurations(tezConfig);
ServicePluginsDescriptor servicePluginsDescriptor;
- UserPayload servicePluginPayload = TezUtils.createUserPayloadFromConf(tezConfig);
+ Credentials llapCredentials = null;
if (llapMode) {
+ if (UserGroupInformation.isSecurityEnabled()) {
+ LlapTokenProvider tp = LlapProxy.getOrInitTokenProvider(conf);
+ Token<LlapTokenIdentifier> token = tp.getDelegationToken();
+ LOG.info("Obtained a token: " + token);
+ llapCredentials = new Credentials();
+ llapCredentials.addToken(LlapTokenIdentifier.KIND_NAME, token);
+ }
+ UserPayload servicePluginPayload = TezUtils.createUserPayloadFromConf(tezConfig);
// we need plugins to handle llap and uber mode
servicePluginsDescriptor = ServicePluginsDescriptor.create(true,
- new TaskSchedulerDescriptor[]{
- TaskSchedulerDescriptor.create(LLAP_SERVICE, LLAP_SCHEDULER)
- .setUserPayload(servicePluginPayload)},
- new ContainerLauncherDescriptor[]{
- ContainerLauncherDescriptor.create(LLAP_SERVICE, LLAP_LAUNCHER)},
- new TaskCommunicatorDescriptor[]{
- TaskCommunicatorDescriptor.create(LLAP_SERVICE, LLAP_TASK_COMMUNICATOR)
- .setUserPayload(servicePluginPayload)});
+ new TaskSchedulerDescriptor[] { TaskSchedulerDescriptor.create(
+ LLAP_SERVICE, LLAP_SCHEDULER).setUserPayload(servicePluginPayload) },
+ new ContainerLauncherDescriptor[] { ContainerLauncherDescriptor.create(
+ LLAP_SERVICE, LLAP_LAUNCHER) },
+ new TaskCommunicatorDescriptor[] { TaskCommunicatorDescriptor.create(
+ LLAP_SERVICE, LLAP_TASK_COMMUNICATOR).setUserPayload(servicePluginPayload) });
} else {
servicePluginsDescriptor = ServicePluginsDescriptor.create(true);
}
@@ -286,7 +301,8 @@ public class TezSessionState {
final TezClient session = TezClient.newBuilder("HIVE-" + sessionId, tezConfig)
.setIsSession(true).setLocalResources(commonLocalResources)
- .setServicePluginDescriptor(servicePluginsDescriptor).build();
+ .setCredentials(llapCredentials).setServicePluginDescriptor(servicePluginsDescriptor)
+ .build();
LOG.info("Opening new Tez Session (id: " + sessionId
+ ", scratch dir: " + tezScratchDir + ")");
http://git-wip-us.apache.org/repos/asf/hive/blob/915587b8/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
index b19c70a..bdf5dc2 100755
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
@@ -40,7 +40,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil;
import org.apache.hadoop.hive.llap.io.api.LlapIo;
-import org.apache.hadoop.hive.llap.io.api.LlapIoProxy;
+import org.apache.hadoop.hive.llap.io.api.LlapProxy;
import org.apache.hadoop.hive.ql.exec.spark.SparkDynamicPartitionPruner;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.exec.Operator;
@@ -214,7 +214,7 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
LOG.debug("Wrapping " + inputFormat);
}
@SuppressWarnings("unchecked")
- LlapIo<VectorizedRowBatch> llapIo = LlapIoProxy.getIo();
+ LlapIo<VectorizedRowBatch> llapIo = LlapProxy.getIo();
if (llapIo == null) {
LOG.info("Not using LLAP because IO is not initialized");
return inputFormat;
http://git-wip-us.apache.org/repos/asf/hive/blob/915587b8/serde/src/java/org/apache/hadoop/hive/serde2/AbstractSerDe.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/AbstractSerDe.java b/serde/src/java/org/apache/hadoop/hive/serde2/AbstractSerDe.java
index 9269ff4..9434e91 100644
--- a/serde/src/java/org/apache/hadoop/hive/serde2/AbstractSerDe.java
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/AbstractSerDe.java
@@ -122,6 +122,6 @@ public abstract class AbstractSerDe implements SerDe {
* does, in fact, store it inside metastore, based on table parameters.
*/
public boolean shouldStoreFieldsInMetastore(Map<String, String> tableParams) {
- return false; // The default, unless SerDe overrides it. TODO#
+ return false; // The default, unless SerDe overrides it.
}
}
[2/2] hive git commit: HIVE-12341 : LLAP: add security to daemon
protocol endpoint (excluding shuffle) (Sergey Shelukhin,
reviewed by Siddharth Seth, Lefty Leverenz)
Posted by se...@apache.org.
HIVE-12341 : LLAP: add security to daemon protocol endpoint (excluding shuffle) (Sergey Shelukhin, reviewed by Siddharth Seth, Lefty Leverenz)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/915587b8
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/915587b8
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/915587b8
Branch: refs/heads/master
Commit: 915587b8c0f2b1fd10f2d5c026027b58501b2896
Parents: 74e5c75
Author: Sergey Shelukhin <se...@apache.org>
Authored: Wed Dec 9 15:55:25 2015 -0800
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Wed Dec 9 16:00:52 2015 -0800
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 46 +-
.../hadoop/hive/llap/io/api/LlapIoProxy.java | 78 --
.../hadoop/hive/llap/io/api/LlapProxy.java | 111 ++
.../hive/llap/registry/ServiceInstance.java | 7 +
.../registry/impl/LlapFixedRegistryImpl.java | 7 +
.../registry/impl/LlapYarnRegistryImpl.java | 30 +-
.../hive/llap/security/LlapTokenIdentifier.java | 82 ++
.../hive/llap/security/LlapTokenProvider.java | 27 +
.../daemon/rpc/LlapDaemonProtocolProtos.java | 1059 +++++++++++++++++-
.../daemon/LlapDaemonProtocolBlockingPB.java | 6 +
.../LlapManagementProtocolBlockingPB.java | 24 +
.../hive/llap/daemon/impl/LlapDaemon.java | 37 +-
.../impl/LlapDaemonProtocolClientImpl.java | 1 -
.../impl/LlapDaemonProtocolServerImpl.java | 155 ++-
.../impl/LlapManagementProtocolClientImpl.java | 82 ++
.../protocol/LlapTaskUmbilicalProtocol.java | 3 +
.../llap/security/LlapDaemonPolicyProvider.java | 38 +
.../hive/llap/security/LlapSecurityHelper.java | 155 +++
.../llap/security/LlapServerSecurityInfo.java | 78 ++
.../hive/llap/security/LlapTokenSelector.java | 53 +
.../hive/llap/security/SecretManager.java | 91 ++
.../llap/tezplugins/LlapTaskCommunicator.java | 22 +-
.../tezplugins/LlapUmbilicalPolicyProvider.java | 42 +
.../hive/llap/tezplugins/TaskCommunicator.java | 42 +-
.../org.apache.hadoop.security.SecurityInfo | 14 +
...rg.apache.hadoop.security.token.TokenRenewer | 14 +
.../src/protobuf/LlapDaemonProtocol.proto | 11 +
.../hive/llap/daemon/MiniLlapCluster.java | 2 +-
.../impl/TestLlapDaemonProtocolServerImpl.java | 3 +-
.../apache/hadoop/hive/ql/io/orc/OrcProto.java | 8 +
.../org/apache/hadoop/hive/ql/exec/DDLTask.java | 2 +-
.../hive/ql/exec/GlobalWorkMapFactory.java | 4 +-
.../hadoop/hive/ql/exec/ObjectCacheFactory.java | 6 +-
.../hive/ql/exec/tez/MapRecordProcessor.java | 4 +-
.../hive/ql/exec/tez/ReduceRecordProcessor.java | 4 +-
.../hive/ql/exec/tez/TezSessionState.java | 38 +-
.../hadoop/hive/ql/io/HiveInputFormat.java | 4 +-
.../hadoop/hive/serde2/AbstractSerDe.java | 2 +-
38 files changed, 2206 insertions(+), 186 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/915587b8/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 058f91d..7e9bf61 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -218,6 +218,9 @@ public class HiveConf extends Configuration {
}
}
+ public static final String HIVE_LLAP_DAEMON_SERVICE_PRINCIPAL_NAME = "hive.llap.daemon.service.principal";
+
+
/**
* dbVars are the parameters can be set per database. If these
* parameters are set as a database property, when switching to that
@@ -2117,16 +2120,6 @@ public class HiveConf extends Configuration {
HIVE_SECURITY_COMMAND_WHITELIST("hive.security.command.whitelist", "set,reset,dfs,add,list,delete,reload,compile",
"Comma separated list of non-SQL Hive commands users are authorized to execute"),
- HIVE_CONF_RESTRICTED_LIST("hive.conf.restricted.list",
- "hive.security.authenticator.manager,hive.security.authorization.manager,hive.users.in.admin.role",
- "Comma separated list of configuration options which are immutable at runtime"),
- HIVE_CONF_HIDDEN_LIST("hive.conf.hidden.list",
- METASTOREPWD.varname + "," + HIVE_SERVER2_SSL_KEYSTORE_PASSWORD.varname,
- "Comma separated list of configuration options which should not be read by normal user like passwords"),
-
- HIVE_CONF_INTERNAL_VARIABLE_LIST("hive.conf.internal.variable.list",
- "hive.added.files.path,hive.added.jars.path,hive.added.archives.path",
- "Comma separated list of variables which are used internally and should not be configurable."),
// If this is set all move tasks at the end of a multi-insert query will only begin once all
// outputs are ready
@@ -2377,6 +2370,26 @@ public class HiveConf extends Configuration {
"By default, percentile latency metrics are disabled."),
LLAP_IO_THREADPOOL_SIZE("hive.llap.io.threadpool.size", 10,
"Specify the number of threads to use for low-level IO thread pool."),
+ LLAP_KERBEROS_PRINCIPAL(HIVE_LLAP_DAEMON_SERVICE_PRINCIPAL_NAME, "",
+ "The name of the LLAP daemon's service principal."),
+ LLAP_KERBEROS_KEYTAB_FILE("hive.llap.daemon.keytab.file", "",
+ "The path to the Kerberos Keytab file containing the LLAP daemon's service principal."),
+ LLAP_ZKSM_KERBEROS_PRINCIPAL("hive.llap.zk.sm.principal", "",
+ "The name of the principal to use to talk to ZooKeeper for ZooKeeper SecretManager."),
+ LLAP_ZKSM_KERBEROS_KEYTAB_FILE("hive.llap.zk.sm.keytab.file", "",
+ "The path to the Kerberos Keytab file containing the principal to use to talk to\n" +
+ "ZooKeeper for ZooKeeper SecretManager."),
+ LLAP_ZKSM_ZK_CONNECTION_STRING("hive.llap.zk.sm.connectionString", "",
+ "ZooKeeper connection string for ZooKeeper SecretManager."),
+ LLAP_SECURITY_ACL("hive.llap.daemon.service.acl", "*", "The ACL for LLAP daemon."),
+ LLAP_MANAGEMENT_ACL("hive.llap.management.service.acl", "*",
+ "The ACL for LLAP daemon management."),
+ // Hadoop DelegationTokenManager default is 1 week.
+ LLAP_DELEGATION_TOKEN_LIFETIME("hive.llap.daemon.delegation.token.lifetime", "14d",
+ new TimeValidator(TimeUnit.SECONDS),
+ "LLAP delegation token lifetime, in seconds if specified without a unit."),
+ LLAP_MANAGEMENT_RPC_PORT("hive.llap.management.rpc.port", 15004,
+ "RPC port for LLAP daemon management service."),
LLAP_DAEMON_RPC_NUM_HANDLERS("hive.llap.daemon.rpc.num.handlers", 5,
"Number of RPC handlers for LLAP daemon.", "llap.daemon.rpc.num.handlers"),
@@ -2532,7 +2545,18 @@ public class HiveConf extends Configuration {
"Expected inflation factor between disk/in memory representation of hash tables"),
HIVE_LOG_TRACE_ID("hive.log.trace.id", "",
"Log tracing id that can be used by upstream clients for tracking respective logs. " +
- "Truncated to " + LOG_PREFIX_LENGTH + " characters. Defaults to use auto-generated session id.");
+ "Truncated to " + LOG_PREFIX_LENGTH + " characters. Defaults to use auto-generated session id."),
+
+
+ HIVE_CONF_RESTRICTED_LIST("hive.conf.restricted.list",
+ "hive.security.authenticator.manager,hive.security.authorization.manager,hive.users.in.admin.role",
+ "Comma separated list of configuration options which are immutable at runtime"),
+ HIVE_CONF_HIDDEN_LIST("hive.conf.hidden.list",
+ METASTOREPWD.varname + "," + HIVE_SERVER2_SSL_KEYSTORE_PASSWORD.varname,
+ "Comma separated list of configuration options which should not be read by normal user like passwords"),
+ HIVE_CONF_INTERNAL_VARIABLE_LIST("hive.conf.internal.variable.list",
+ "hive.added.files.path,hive.added.jars.path,hive.added.archives.path",
+ "Comma separated list of variables which are used internally and should not be configurable.");
public final String varname;
http://git-wip-us.apache.org/repos/asf/hive/blob/915587b8/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIoProxy.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIoProxy.java b/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIoProxy.java
deleted file mode 100644
index 4c31e32..0000000
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIoProxy.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.llap.io.api;
-
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-
-import org.apache.hadoop.conf.Configuration;
-
-
-@SuppressWarnings("rawtypes")
-public class LlapIoProxy {
- private final static String IMPL_CLASS = "org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl";
-
- // Llap server depends on Hive execution, so the reverse cannot be true. We create the I/O
- // singleton once (on daemon startup); the said singleton serves as the IO interface.
- private static LlapIo io = null;
-
- private static boolean isDaemon = false;
-
- public static void setDaemon(boolean isDaemon) {
- LlapIoProxy.isDaemon = isDaemon;
- }
-
- public static boolean isDaemon() {
- return isDaemon;
- }
-
- public static LlapIo getIo() {
- return io;
- }
-
- public static void initializeLlapIo(Configuration conf) {
-
- if (io != null) {
- return; // already initialized
- }
-
- try {
- io = createIoImpl(conf);
- } catch (IOException e) {
- throw new RuntimeException("Cannot initialize local server", e);
- }
- }
-
- private static LlapIo createIoImpl(Configuration conf) throws IOException {
- try {
- @SuppressWarnings("unchecked")
- Class<? extends LlapIo> clazz = (Class<? extends LlapIo>)Class.forName(IMPL_CLASS);
- Constructor<? extends LlapIo> ctor = clazz.getDeclaredConstructor(Configuration.class);
- ctor.setAccessible(true);
- return ctor.newInstance(conf);
- } catch (Exception e) {
- throw new RuntimeException("Failed to create impl class", e);
- }
- }
-
- public static void close() {
- if (io != null) {
- io.close();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/915587b8/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapProxy.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapProxy.java b/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapProxy.java
new file mode 100644
index 0000000..6359bab
--- /dev/null
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapProxy.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.io.api;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.llap.security.LlapTokenProvider;
+
+
+@SuppressWarnings("rawtypes")
+public class LlapProxy {
+ private final static String IMPL_CLASS = "org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl";
+ private final static String TOKEN_CLASS =
+ "org.apache.hadoop.hive.llap.security.LlapSecurityHelper";
+
+ // Llap server depends on Hive execution, so the reverse cannot be true. We create the I/O
+ // singleton once (on daemon startup); the said singleton serves as the IO interface.
+ private static LlapIo io = null;
+ private static LlapTokenProvider tokenProvider = null;
+ private static final Object tpInitLock = new Object();
+ private static volatile boolean isTpInitDone = false;
+
+ private static boolean isDaemon = false;
+
+ public static void setDaemon(boolean isDaemon) {
+ LlapProxy.isDaemon = isDaemon;
+ }
+
+ public static boolean isDaemon() {
+ return isDaemon;
+ }
+
+ public static LlapIo getIo() {
+ return io;
+ }
+
+ public static void initializeLlapIo(Configuration conf) {
+ if (io != null) {
+ return; // already initialized
+ }
+
+ try {
+ io = createIoImpl(conf);
+ } catch (IOException e) {
+ throw new RuntimeException("Cannot initialize local server", e);
+ }
+ }
+
+ private static LlapIo createIoImpl(Configuration conf) throws IOException {
+ try {
+ @SuppressWarnings("unchecked")
+ Class<? extends LlapIo> clazz = (Class<? extends LlapIo>)Class.forName(IMPL_CLASS);
+ Constructor<? extends LlapIo> ctor = clazz.getDeclaredConstructor(Configuration.class);
+ ctor.setAccessible(true);
+ return ctor.newInstance(conf);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to create impl class", e);
+ }
+ }
+
+ public static LlapTokenProvider getOrInitTokenProvider(Configuration conf) {
+ if (isTpInitDone) return tokenProvider;
+ synchronized (tpInitLock) {
+ if (isTpInitDone) return tokenProvider;
+ try {
+ tokenProvider = createTokenProviderImpl(conf);
+ isTpInitDone = true;
+ } catch (IOException e) {
+ throw new RuntimeException("Cannot initialize token provider", e);
+ }
+ return tokenProvider;
+ }
+ }
+
+ private static LlapTokenProvider createTokenProviderImpl(Configuration conf) throws IOException {
+ try {
+ @SuppressWarnings("unchecked")
+ Class<? extends LlapTokenProvider> clazz =
+ (Class<? extends LlapTokenProvider>)Class.forName(TOKEN_CLASS);
+ Constructor<? extends LlapTokenProvider> ctor =
+ clazz.getDeclaredConstructor(Configuration.class);
+ ctor.setAccessible(true);
+ return ctor.newInstance(conf);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to create token provider class", e);
+ }
+ }
+
+ public static void close() {
+ if (io != null) {
+ io.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/915587b8/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstance.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstance.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstance.java
index f116de4..2bd860a 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstance.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstance.java
@@ -40,6 +40,13 @@ public interface ServiceInstance {
public int getRpcPort();
/**
+ * Management endpoint for service instance
+ *
+ * @return
+ */
+ public int getManagementPort();
+
+ /**
* Shuffle Endpoint for service instance
*
* @return
http://git-wip-us.apache.org/repos/asf/hive/blob/915587b8/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
index a085427..ef9de32 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
@@ -46,6 +46,7 @@ public class LlapFixedRegistryImpl implements ServiceRegistry {
private final int port;
private final int shuffle;
+ private final int mngPort;
private final String[] hosts;
private final int memory;
private final int vcores;
@@ -58,6 +59,7 @@ public class LlapFixedRegistryImpl implements ServiceRegistry {
this.port = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_RPC_PORT);
this.shuffle = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_YARN_SHUFFLE_PORT);
this.resolveHosts = conf.getBoolean(FIXED_REGISTRY_RESOLVE_HOST_NAMES, true);
+ this.mngPort = HiveConf.getIntVar(conf, ConfVars.LLAP_MANAGEMENT_RPC_PORT);
for (Map.Entry<String, String> kv : conf) {
if (kv.getKey().startsWith(LlapConfiguration.LLAP_DAEMON_PREFIX)
@@ -136,6 +138,11 @@ public class LlapFixedRegistryImpl implements ServiceRegistry {
}
@Override
+ public int getManagementPort() {
+ return LlapFixedRegistryImpl.this.mngPort;
+ }
+
+ @Override
public int getShufflePort() {
return LlapFixedRegistryImpl.this.shuffle;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/915587b8/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapYarnRegistryImpl.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapYarnRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapYarnRegistryImpl.java
index 2673ad7..fc2ebf2 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapYarnRegistryImpl.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapYarnRegistryImpl.java
@@ -58,6 +58,10 @@ import com.google.common.base.Preconditions;
public class LlapYarnRegistryImpl implements ServiceRegistry {
+ /** IPC endpoint names. */
+ private static final String IPC_SERVICES = "services",
+ IPC_MNG = "llapmng", IPC_SHUFFLE = "shuffle", IPC_LLAP = "llap";
+
private static final Logger LOG = LoggerFactory.getLogger(LlapYarnRegistryImpl.class);
private final RegistryOperationsService client;
@@ -108,13 +112,13 @@ public class LlapYarnRegistryImpl implements ServiceRegistry {
public Endpoint getRpcEndpoint() {
final int rpcPort = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_RPC_PORT);
- return RegistryTypeUtils.ipcEndpoint("llap", new InetSocketAddress(hostname, rpcPort));
+ return RegistryTypeUtils.ipcEndpoint(IPC_LLAP, new InetSocketAddress(hostname, rpcPort));
}
public Endpoint getShuffleEndpoint() {
final int shufflePort = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_YARN_SHUFFLE_PORT);
// HTTP today, but might not be
- return RegistryTypeUtils.inetAddrEndpoint("shuffle", ProtocolTypes.PROTOCOL_TCP, hostname,
+ return RegistryTypeUtils.inetAddrEndpoint(IPC_SHUFFLE, ProtocolTypes.PROTOCOL_TCP, hostname,
shufflePort);
}
@@ -125,7 +129,7 @@ public class LlapYarnRegistryImpl implements ServiceRegistry {
final URL serviceURL;
try {
serviceURL = new URL(scheme, hostname, servicePort, "");
- return RegistryTypeUtils.webEndpoint("services", serviceURL.toURI());
+ return RegistryTypeUtils.webEndpoint(IPC_SERVICES, serviceURL.toURI());
} catch (MalformedURLException e) {
throw new RuntimeException(e);
} catch (URISyntaxException e) {
@@ -133,6 +137,11 @@ public class LlapYarnRegistryImpl implements ServiceRegistry {
}
}
+ public Endpoint getMngEndpoint() {
+ return RegistryTypeUtils.ipcEndpoint(IPC_MNG, new InetSocketAddress(hostname,
+ HiveConf.getIntVar(conf, ConfVars.LLAP_MANAGEMENT_RPC_PORT)));
+ }
+
private final String getPath() {
return this.path;
}
@@ -142,6 +151,7 @@ public class LlapYarnRegistryImpl implements ServiceRegistry {
String path = getPath();
ServiceRecord srv = new ServiceRecord();
srv.addInternalEndpoint(getRpcEndpoint());
+ srv.addInternalEndpoint(getMngEndpoint());
srv.addInternalEndpoint(getShuffleEndpoint());
srv.addExternalEndpoint(getServicesEndpoint());
@@ -174,13 +184,15 @@ public class LlapYarnRegistryImpl implements ServiceRegistry {
private boolean alive = true;
private final String host;
private final int rpcPort;
+ private final int mngPort;
private final int shufflePort;
public DynamicServiceInstance(ServiceRecord srv) throws IOException {
this.srv = srv;
- final Endpoint shuffle = srv.getInternalEndpoint("shuffle");
- final Endpoint rpc = srv.getInternalEndpoint("llap");
+ final Endpoint shuffle = srv.getInternalEndpoint(IPC_SHUFFLE);
+ final Endpoint rpc = srv.getInternalEndpoint(IPC_LLAP);
+ final Endpoint mng = srv.getInternalEndpoint(IPC_MNG);
this.host =
RegistryTypeUtils.getAddressField(rpc.addresses.get(0),
@@ -188,6 +200,9 @@ public class LlapYarnRegistryImpl implements ServiceRegistry {
this.rpcPort =
Integer.valueOf(RegistryTypeUtils.getAddressField(rpc.addresses.get(0),
AddressTypes.ADDRESS_PORT_FIELD));
+ this.mngPort =
+ Integer.valueOf(RegistryTypeUtils.getAddressField(mng.addresses.get(0),
+ AddressTypes.ADDRESS_PORT_FIELD));
this.shufflePort =
Integer.valueOf(RegistryTypeUtils.getAddressField(shuffle.addresses.get(0),
AddressTypes.ADDRESS_PORT_FIELD));
@@ -241,6 +256,11 @@ public class LlapYarnRegistryImpl implements ServiceRegistry {
return "DynamicServiceInstance [alive=" + alive + ", host=" + host + ":" + rpcPort + " with resources=" + getResource() +"]";
}
+ @Override
+ public int getManagementPort() {
+ return mngPort;
+ }
+
// Relying on the identity hashCode and equality, since refreshing instances retains the old copy
// of an already known instance.
}
http://git-wip-us.apache.org/repos/asf/hive/blob/915587b8/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenIdentifier.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenIdentifier.java b/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenIdentifier.java
new file mode 100644
index 0000000..f0bb495
--- /dev/null
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenIdentifier.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.security;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
+
+/** For now, a LLAP token gives access to any LLAP server. */
+public class LlapTokenIdentifier extends AbstractDelegationTokenIdentifier {
+ private static final String KIND = "LLAP_TOKEN";
+ public static final Text KIND_NAME = new Text(KIND);
+
+ public LlapTokenIdentifier() {
+ super();
+ }
+
+ public LlapTokenIdentifier(Text owner, Text renewer, Text realUser) {
+ super(owner, renewer, realUser);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ super.write(out);
+ // Nothing right now.
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ super.readFields(in);
+ // Nothing right now.
+ }
+
+ @Override
+ public Text getKind() {
+ return KIND_NAME;
+ }
+
+ @Override
+ public int hashCode() {
+ return -1;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ return (other != null) && other.getClass().isAssignableFrom(this.getClass());
+ }
+
+ @Override
+ public String toString() {
+ return KIND;
+ }
+
+ @InterfaceAudience.Private
+ public static class Renewer extends Token.TrivialRenewer {
+ @Override
+ protected Text getKind() {
+ return KIND_NAME;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/915587b8/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenProvider.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenProvider.java b/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenProvider.java
new file mode 100644
index 0000000..2e99a28
--- /dev/null
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenProvider.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.security;
+
+import java.io.IOException;
+
+import org.apache.hadoop.security.token.Token;
+
+public interface LlapTokenProvider {
+ Token<LlapTokenIdentifier> getDelegationToken() throws IOException;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/915587b8/llap-server/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
----------------------------------------------------------------------
diff --git a/llap-server/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java b/llap-server/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
index b044df9..af009b8 100644
--- a/llap-server/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
+++ b/llap-server/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
@@ -11991,6 +11991,781 @@ public final class LlapDaemonProtocolProtos {
// @@protoc_insertion_point(class_scope:TerminateFragmentResponseProto)
}
+ public interface GetTokenRequestProtoOrBuilder
+ extends com.google.protobuf.MessageOrBuilder {
+ }
+ /**
+ * Protobuf type {@code GetTokenRequestProto}
+ */
+ public static final class GetTokenRequestProto extends
+ com.google.protobuf.GeneratedMessage
+ implements GetTokenRequestProtoOrBuilder {
+ // Use GetTokenRequestProto.newBuilder() to construct.
+ private GetTokenRequestProto(com.google.protobuf.GeneratedMessage.Builder<?> builder) {
+ super(builder);
+ this.unknownFields = builder.getUnknownFields();
+ }
+ private GetTokenRequestProto(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); }
+
+ private static final GetTokenRequestProto defaultInstance;
+ public static GetTokenRequestProto getDefaultInstance() {
+ return defaultInstance;
+ }
+
+ public GetTokenRequestProto getDefaultInstanceForType() {
+ return defaultInstance;
+ }
+
+ private final com.google.protobuf.UnknownFieldSet unknownFields;
+ @java.lang.Override
+ public final com.google.protobuf.UnknownFieldSet
+ getUnknownFields() {
+ return this.unknownFields;
+ }
+ private GetTokenRequestProto(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ initFields();
+ com.google.protobuf.UnknownFieldSet.Builder unknownFields =
+ com.google.protobuf.UnknownFieldSet.newBuilder();
+ try {
+ boolean done = false;
+ while (!done) {
+ int tag = input.readTag();
+ switch (tag) {
+ case 0:
+ done = true;
+ break;
+ default: {
+ if (!parseUnknownField(input, unknownFields,
+ extensionRegistry, tag)) {
+ done = true;
+ }
+ break;
+ }
+ }
+ }
+ } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+ throw e.setUnfinishedMessage(this);
+ } catch (java.io.IOException e) {
+ throw new com.google.protobuf.InvalidProtocolBufferException(
+ e.getMessage()).setUnfinishedMessage(this);
+ } finally {
+ this.unknownFields = unknownFields.build();
+ makeExtensionsImmutable();
+ }
+ }
+ public static final com.google.protobuf.Descriptors.Descriptor
+ getDescriptor() {
+ return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.internal_static_GetTokenRequestProto_descriptor;
+ }
+
+ protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.internal_static_GetTokenRequestProto_fieldAccessorTable
+ .ensureFieldAccessorsInitialized(
+ org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto.class, org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto.Builder.class);
+ }
+
+ public static com.google.protobuf.Parser<GetTokenRequestProto> PARSER =
+ new com.google.protobuf.AbstractParser<GetTokenRequestProto>() {
+ public GetTokenRequestProto parsePartialFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return new GetTokenRequestProto(input, extensionRegistry);
+ }
+ };
+
+ @java.lang.Override
+ public com.google.protobuf.Parser<GetTokenRequestProto> getParserForType() {
+ return PARSER;
+ }
+
+ private void initFields() {
+ }
+ private byte memoizedIsInitialized = -1;
+ public final boolean isInitialized() {
+ byte isInitialized = memoizedIsInitialized;
+ if (isInitialized != -1) return isInitialized == 1;
+
+ memoizedIsInitialized = 1;
+ return true;
+ }
+
+ public void writeTo(com.google.protobuf.CodedOutputStream output)
+ throws java.io.IOException {
+ getSerializedSize();
+ getUnknownFields().writeTo(output);
+ }
+
+ private int memoizedSerializedSize = -1;
+ public int getSerializedSize() {
+ int size = memoizedSerializedSize;
+ if (size != -1) return size;
+
+ size = 0;
+ size += getUnknownFields().getSerializedSize();
+ memoizedSerializedSize = size;
+ return size;
+ }
+
+ private static final long serialVersionUID = 0L;
+ @java.lang.Override
+ protected java.lang.Object writeReplace()
+ throws java.io.ObjectStreamException {
+ return super.writeReplace();
+ }
+
+ @java.lang.Override
+ public boolean equals(final java.lang.Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (!(obj instanceof org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto)) {
+ return super.equals(obj);
+ }
+ org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto other = (org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto) obj;
+
+ boolean result = true;
+ result = result &&
+ getUnknownFields().equals(other.getUnknownFields());
+ return result;
+ }
+
+ private int memoizedHashCode = 0;
+ @java.lang.Override
+ public int hashCode() {
+ if (memoizedHashCode != 0) {
+ return memoizedHashCode;
+ }
+ int hash = 41;
+ hash = (19 * hash) + getDescriptorForType().hashCode();
+ hash = (29 * hash) + getUnknownFields().hashCode();
+ memoizedHashCode = hash;
+ return hash;
+ }
+
+ public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto parseFrom(
+ com.google.protobuf.ByteString data)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data);
+ }
+ public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto parseFrom(
+ com.google.protobuf.ByteString data,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data, extensionRegistry);
+ }
+ public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto parseFrom(byte[] data)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data);
+ }
+ public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto parseFrom(
+ byte[] data,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data, extensionRegistry);
+ }
+ public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto parseFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input);
+ }
+ public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto parseFrom(
+ java.io.InputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input, extensionRegistry);
+ }
+ public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto parseDelimitedFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ return PARSER.parseDelimitedFrom(input);
+ }
+ public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto parseDelimitedFrom(
+ java.io.InputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return PARSER.parseDelimitedFrom(input, extensionRegistry);
+ }
+ public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto parseFrom(
+ com.google.protobuf.CodedInputStream input)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input);
+ }
+ public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto parseFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input, extensionRegistry);
+ }
+
+ public static Builder newBuilder() { return Builder.create(); }
+ public Builder newBuilderForType() { return newBuilder(); }
+ public static Builder newBuilder(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto prototype) {
+ return newBuilder().mergeFrom(prototype);
+ }
+ public Builder toBuilder() { return newBuilder(this); }
+
+ @java.lang.Override
+ protected Builder newBuilderForType(
+ com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+ Builder builder = new Builder(parent);
+ return builder;
+ }
+ /**
+ * Protobuf type {@code GetTokenRequestProto}
+ */
+ public static final class Builder extends
+ com.google.protobuf.GeneratedMessage.Builder<Builder>
+ implements org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProtoOrBuilder {
+ public static final com.google.protobuf.Descriptors.Descriptor
+ getDescriptor() {
+ return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.internal_static_GetTokenRequestProto_descriptor;
+ }
+
+ protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.internal_static_GetTokenRequestProto_fieldAccessorTable
+ .ensureFieldAccessorsInitialized(
+ org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto.class, org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto.Builder.class);
+ }
+
+ // Construct using org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto.newBuilder()
+ private Builder() {
+ maybeForceBuilderInitialization();
+ }
+
+ private Builder(
+ com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+ super(parent);
+ maybeForceBuilderInitialization();
+ }
+ private void maybeForceBuilderInitialization() {
+ if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
+ }
+ }
+ private static Builder create() {
+ return new Builder();
+ }
+
+ public Builder clear() {
+ super.clear();
+ return this;
+ }
+
+ public Builder clone() {
+ return create().mergeFrom(buildPartial());
+ }
+
+ public com.google.protobuf.Descriptors.Descriptor
+ getDescriptorForType() {
+ return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.internal_static_GetTokenRequestProto_descriptor;
+ }
+
+ public org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto getDefaultInstanceForType() {
+ return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto.getDefaultInstance();
+ }
+
+ public org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto build() {
+ org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto result = buildPartial();
+ if (!result.isInitialized()) {
+ throw newUninitializedMessageException(result);
+ }
+ return result;
+ }
+
+ public org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto buildPartial() {
+ org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto result = new org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto(this);
+ onBuilt();
+ return result;
+ }
+
+ public Builder mergeFrom(com.google.protobuf.Message other) {
+ if (other instanceof org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto) {
+ return mergeFrom((org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto)other);
+ } else {
+ super.mergeFrom(other);
+ return this;
+ }
+ }
+
+ public Builder mergeFrom(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto other) {
+ if (other == org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto.getDefaultInstance()) return this;
+ this.mergeUnknownFields(other.getUnknownFields());
+ return this;
+ }
+
+ public final boolean isInitialized() {
+ return true;
+ }
+
+ public Builder mergeFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto parsedMessage = null;
+ try {
+ parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
+ } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+ parsedMessage = (org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto) e.getUnfinishedMessage();
+ throw e;
+ } finally {
+ if (parsedMessage != null) {
+ mergeFrom(parsedMessage);
+ }
+ }
+ return this;
+ }
+
+ // @@protoc_insertion_point(builder_scope:GetTokenRequestProto)
+ }
+
+ static {
+ defaultInstance = new GetTokenRequestProto(true);
+ defaultInstance.initFields();
+ }
+
+ // @@protoc_insertion_point(class_scope:GetTokenRequestProto)
+ }
+
+ public interface GetTokenResponseProtoOrBuilder
+ extends com.google.protobuf.MessageOrBuilder {
+
+ // optional bytes token = 1;
+ /**
+ * <code>optional bytes token = 1;</code>
+ */
+ boolean hasToken();
+ /**
+ * <code>optional bytes token = 1;</code>
+ */
+ com.google.protobuf.ByteString getToken();
+ }
+ /**
+ * Protobuf type {@code GetTokenResponseProto}
+ */
+ public static final class GetTokenResponseProto extends
+ com.google.protobuf.GeneratedMessage
+ implements GetTokenResponseProtoOrBuilder {
+ // Use GetTokenResponseProto.newBuilder() to construct.
+ private GetTokenResponseProto(com.google.protobuf.GeneratedMessage.Builder<?> builder) {
+ super(builder);
+ this.unknownFields = builder.getUnknownFields();
+ }
+ private GetTokenResponseProto(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); }
+
+ private static final GetTokenResponseProto defaultInstance;
+ public static GetTokenResponseProto getDefaultInstance() {
+ return defaultInstance;
+ }
+
+ public GetTokenResponseProto getDefaultInstanceForType() {
+ return defaultInstance;
+ }
+
+ private final com.google.protobuf.UnknownFieldSet unknownFields;
+ @java.lang.Override
+ public final com.google.protobuf.UnknownFieldSet
+ getUnknownFields() {
+ return this.unknownFields;
+ }
+ private GetTokenResponseProto(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ initFields();
+ int mutable_bitField0_ = 0;
+ com.google.protobuf.UnknownFieldSet.Builder unknownFields =
+ com.google.protobuf.UnknownFieldSet.newBuilder();
+ try {
+ boolean done = false;
+ while (!done) {
+ int tag = input.readTag();
+ switch (tag) {
+ case 0:
+ done = true;
+ break;
+ default: {
+ if (!parseUnknownField(input, unknownFields,
+ extensionRegistry, tag)) {
+ done = true;
+ }
+ break;
+ }
+ case 10: {
+ bitField0_ |= 0x00000001;
+ token_ = input.readBytes();
+ break;
+ }
+ }
+ }
+ } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+ throw e.setUnfinishedMessage(this);
+ } catch (java.io.IOException e) {
+ throw new com.google.protobuf.InvalidProtocolBufferException(
+ e.getMessage()).setUnfinishedMessage(this);
+ } finally {
+ this.unknownFields = unknownFields.build();
+ makeExtensionsImmutable();
+ }
+ }
+ public static final com.google.protobuf.Descriptors.Descriptor
+ getDescriptor() {
+ return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.internal_static_GetTokenResponseProto_descriptor;
+ }
+
+ protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.internal_static_GetTokenResponseProto_fieldAccessorTable
+ .ensureFieldAccessorsInitialized(
+ org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto.class, org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto.Builder.class);
+ }
+
+ public static com.google.protobuf.Parser<GetTokenResponseProto> PARSER =
+ new com.google.protobuf.AbstractParser<GetTokenResponseProto>() {
+ public GetTokenResponseProto parsePartialFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return new GetTokenResponseProto(input, extensionRegistry);
+ }
+ };
+
+ @java.lang.Override
+ public com.google.protobuf.Parser<GetTokenResponseProto> getParserForType() {
+ return PARSER;
+ }
+
+ private int bitField0_;
+ // optional bytes token = 1;
+ public static final int TOKEN_FIELD_NUMBER = 1;
+ private com.google.protobuf.ByteString token_;
+ /**
+ * <code>optional bytes token = 1;</code>
+ */
+ public boolean hasToken() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ /**
+ * <code>optional bytes token = 1;</code>
+ */
+ public com.google.protobuf.ByteString getToken() {
+ return token_;
+ }
+
+ private void initFields() {
+ token_ = com.google.protobuf.ByteString.EMPTY;
+ }
+ private byte memoizedIsInitialized = -1;
+ public final boolean isInitialized() {
+ byte isInitialized = memoizedIsInitialized;
+ if (isInitialized != -1) return isInitialized == 1;
+
+ memoizedIsInitialized = 1;
+ return true;
+ }
+
+ public void writeTo(com.google.protobuf.CodedOutputStream output)
+ throws java.io.IOException {
+ getSerializedSize();
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ output.writeBytes(1, token_);
+ }
+ getUnknownFields().writeTo(output);
+ }
+
+ private int memoizedSerializedSize = -1;
+ public int getSerializedSize() {
+ int size = memoizedSerializedSize;
+ if (size != -1) return size;
+
+ size = 0;
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBytesSize(1, token_);
+ }
+ size += getUnknownFields().getSerializedSize();
+ memoizedSerializedSize = size;
+ return size;
+ }
+
+ private static final long serialVersionUID = 0L;
+ @java.lang.Override
+ protected java.lang.Object writeReplace()
+ throws java.io.ObjectStreamException {
+ return super.writeReplace();
+ }
+
+ @java.lang.Override
+ public boolean equals(final java.lang.Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (!(obj instanceof org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto)) {
+ return super.equals(obj);
+ }
+ org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto other = (org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto) obj;
+
+ boolean result = true;
+ result = result && (hasToken() == other.hasToken());
+ if (hasToken()) {
+ result = result && getToken()
+ .equals(other.getToken());
+ }
+ result = result &&
+ getUnknownFields().equals(other.getUnknownFields());
+ return result;
+ }
+
+ private int memoizedHashCode = 0;
+ @java.lang.Override
+ public int hashCode() {
+ if (memoizedHashCode != 0) {
+ return memoizedHashCode;
+ }
+ int hash = 41;
+ hash = (19 * hash) + getDescriptorForType().hashCode();
+ if (hasToken()) {
+ hash = (37 * hash) + TOKEN_FIELD_NUMBER;
+ hash = (53 * hash) + getToken().hashCode();
+ }
+ hash = (29 * hash) + getUnknownFields().hashCode();
+ memoizedHashCode = hash;
+ return hash;
+ }
+
+ public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto parseFrom(
+ com.google.protobuf.ByteString data)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data);
+ }
+ public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto parseFrom(
+ com.google.protobuf.ByteString data,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data, extensionRegistry);
+ }
+ public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto parseFrom(byte[] data)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data);
+ }
+ public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto parseFrom(
+ byte[] data,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data, extensionRegistry);
+ }
+ public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto parseFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input);
+ }
+ public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto parseFrom(
+ java.io.InputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input, extensionRegistry);
+ }
+ public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto parseDelimitedFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ return PARSER.parseDelimitedFrom(input);
+ }
+ public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto parseDelimitedFrom(
+ java.io.InputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return PARSER.parseDelimitedFrom(input, extensionRegistry);
+ }
+ public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto parseFrom(
+ com.google.protobuf.CodedInputStream input)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input);
+ }
+ public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto parseFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input, extensionRegistry);
+ }
+
+ public static Builder newBuilder() { return Builder.create(); }
+ public Builder newBuilderForType() { return newBuilder(); }
+ public static Builder newBuilder(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto prototype) {
+ return newBuilder().mergeFrom(prototype);
+ }
+ public Builder toBuilder() { return newBuilder(this); }
+
+ @java.lang.Override
+ protected Builder newBuilderForType(
+ com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+ Builder builder = new Builder(parent);
+ return builder;
+ }
+ /**
+ * Protobuf type {@code GetTokenResponseProto}
+ */
+ public static final class Builder extends
+ com.google.protobuf.GeneratedMessage.Builder<Builder>
+ implements org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProtoOrBuilder {
+ public static final com.google.protobuf.Descriptors.Descriptor
+ getDescriptor() {
+ return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.internal_static_GetTokenResponseProto_descriptor;
+ }
+
+ protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.internal_static_GetTokenResponseProto_fieldAccessorTable
+ .ensureFieldAccessorsInitialized(
+ org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto.class, org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto.Builder.class);
+ }
+
+ // Construct using org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto.newBuilder()
+ private Builder() {
+ maybeForceBuilderInitialization();
+ }
+
+ private Builder(
+ com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+ super(parent);
+ maybeForceBuilderInitialization();
+ }
+ private void maybeForceBuilderInitialization() {
+ if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
+ }
+ }
+ private static Builder create() {
+ return new Builder();
+ }
+
+ public Builder clear() {
+ super.clear();
+ token_ = com.google.protobuf.ByteString.EMPTY;
+ bitField0_ = (bitField0_ & ~0x00000001);
+ return this;
+ }
+
+ public Builder clone() {
+ return create().mergeFrom(buildPartial());
+ }
+
+ public com.google.protobuf.Descriptors.Descriptor
+ getDescriptorForType() {
+ return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.internal_static_GetTokenResponseProto_descriptor;
+ }
+
+ public org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto getDefaultInstanceForType() {
+ return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto.getDefaultInstance();
+ }
+
+ public org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto build() {
+ org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto result = buildPartial();
+ if (!result.isInitialized()) {
+ throw newUninitializedMessageException(result);
+ }
+ return result;
+ }
+
+ public org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto buildPartial() {
+ org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto result = new org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto(this);
+ int from_bitField0_ = bitField0_;
+ int to_bitField0_ = 0;
+ if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+ to_bitField0_ |= 0x00000001;
+ }
+ result.token_ = token_;
+ result.bitField0_ = to_bitField0_;
+ onBuilt();
+ return result;
+ }
+
+ public Builder mergeFrom(com.google.protobuf.Message other) {
+ if (other instanceof org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto) {
+ return mergeFrom((org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto)other);
+ } else {
+ super.mergeFrom(other);
+ return this;
+ }
+ }
+
+ public Builder mergeFrom(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto other) {
+ if (other == org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto.getDefaultInstance()) return this;
+ if (other.hasToken()) {
+ setToken(other.getToken());
+ }
+ this.mergeUnknownFields(other.getUnknownFields());
+ return this;
+ }
+
+ public final boolean isInitialized() {
+ return true;
+ }
+
+ public Builder mergeFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto parsedMessage = null;
+ try {
+ parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
+ } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+ parsedMessage = (org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto) e.getUnfinishedMessage();
+ throw e;
+ } finally {
+ if (parsedMessage != null) {
+ mergeFrom(parsedMessage);
+ }
+ }
+ return this;
+ }
+ private int bitField0_;
+
+ // optional bytes token = 1;
+ private com.google.protobuf.ByteString token_ = com.google.protobuf.ByteString.EMPTY;
+ /**
+ * <code>optional bytes token = 1;</code>
+ */
+ public boolean hasToken() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ /**
+ * <code>optional bytes token = 1;</code>
+ */
+ public com.google.protobuf.ByteString getToken() {
+ return token_;
+ }
+ /**
+ * <code>optional bytes token = 1;</code>
+ */
+ public Builder setToken(com.google.protobuf.ByteString value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000001;
+ token_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional bytes token = 1;</code>
+ */
+ public Builder clearToken() {
+ bitField0_ = (bitField0_ & ~0x00000001);
+ token_ = getDefaultInstance().getToken();
+ onChanged();
+ return this;
+ }
+
+ // @@protoc_insertion_point(builder_scope:GetTokenResponseProto)
+ }
+
+ static {
+ defaultInstance = new GetTokenResponseProto(true);
+ defaultInstance.initFields();
+ }
+
+ // @@protoc_insertion_point(class_scope:GetTokenResponseProto)
+ }
+
/**
* Protobuf service {@code LlapDaemonProtocol}
*/
@@ -12436,6 +13211,238 @@ public final class LlapDaemonProtocolProtos {
// @@protoc_insertion_point(class_scope:LlapDaemonProtocol)
}
+ /**
+ * Protobuf service {@code LlapManagementProtocol}
+ */
+ public static abstract class LlapManagementProtocol
+ implements com.google.protobuf.Service {
+ protected LlapManagementProtocol() {}
+
+ public interface Interface {
+ /**
+ * <code>rpc getDelegationToken(.GetTokenRequestProto) returns (.GetTokenResponseProto);</code>
+ */
+ public abstract void getDelegationToken(
+ com.google.protobuf.RpcController controller,
+ org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto request,
+ com.google.protobuf.RpcCallback<org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto> done);
+
+ }
+
+ public static com.google.protobuf.Service newReflectiveService(
+ final Interface impl) {
+ return new LlapManagementProtocol() {
+ @java.lang.Override
+ public void getDelegationToken(
+ com.google.protobuf.RpcController controller,
+ org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto request,
+ com.google.protobuf.RpcCallback<org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto> done) {
+ impl.getDelegationToken(controller, request, done);
+ }
+
+ };
+ }
+
+ public static com.google.protobuf.BlockingService
+ newReflectiveBlockingService(final BlockingInterface impl) {
+ return new com.google.protobuf.BlockingService() {
+ public final com.google.protobuf.Descriptors.ServiceDescriptor
+ getDescriptorForType() {
+ return getDescriptor();
+ }
+
+ public final com.google.protobuf.Message callBlockingMethod(
+ com.google.protobuf.Descriptors.MethodDescriptor method,
+ com.google.protobuf.RpcController controller,
+ com.google.protobuf.Message request)
+ throws com.google.protobuf.ServiceException {
+ if (method.getService() != getDescriptor()) {
+ throw new java.lang.IllegalArgumentException(
+ "Service.callBlockingMethod() given method descriptor for " +
+ "wrong service type.");
+ }
+ switch(method.getIndex()) {
+ case 0:
+ return impl.getDelegationToken(controller, (org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto)request);
+ default:
+ throw new java.lang.AssertionError("Can't get here.");
+ }
+ }
+
+ public final com.google.protobuf.Message
+ getRequestPrototype(
+ com.google.protobuf.Descriptors.MethodDescriptor method) {
+ if (method.getService() != getDescriptor()) {
+ throw new java.lang.IllegalArgumentException(
+ "Service.getRequestPrototype() given method " +
+ "descriptor for wrong service type.");
+ }
+ switch(method.getIndex()) {
+ case 0:
+ return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto.getDefaultInstance();
+ default:
+ throw new java.lang.AssertionError("Can't get here.");
+ }
+ }
+
+ public final com.google.protobuf.Message
+ getResponsePrototype(
+ com.google.protobuf.Descriptors.MethodDescriptor method) {
+ if (method.getService() != getDescriptor()) {
+ throw new java.lang.IllegalArgumentException(
+ "Service.getResponsePrototype() given method " +
+ "descriptor for wrong service type.");
+ }
+ switch(method.getIndex()) {
+ case 0:
+ return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto.getDefaultInstance();
+ default:
+ throw new java.lang.AssertionError("Can't get here.");
+ }
+ }
+
+ };
+ }
+
+ /**
+ * <code>rpc getDelegationToken(.GetTokenRequestProto) returns (.GetTokenResponseProto);</code>
+ */
+ public abstract void getDelegationToken(
+ com.google.protobuf.RpcController controller,
+ org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto request,
+ com.google.protobuf.RpcCallback<org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto> done);
+
+ public static final
+ com.google.protobuf.Descriptors.ServiceDescriptor
+ getDescriptor() {
+ return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.getDescriptor().getServices().get(1);
+ }
+ public final com.google.protobuf.Descriptors.ServiceDescriptor
+ getDescriptorForType() {
+ return getDescriptor();
+ }
+
+ public final void callMethod(
+ com.google.protobuf.Descriptors.MethodDescriptor method,
+ com.google.protobuf.RpcController controller,
+ com.google.protobuf.Message request,
+ com.google.protobuf.RpcCallback<
+ com.google.protobuf.Message> done) {
+ if (method.getService() != getDescriptor()) {
+ throw new java.lang.IllegalArgumentException(
+ "Service.callMethod() given method descriptor for wrong " +
+ "service type.");
+ }
+ switch(method.getIndex()) {
+ case 0:
+ this.getDelegationToken(controller, (org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto)request,
+ com.google.protobuf.RpcUtil.<org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto>specializeCallback(
+ done));
+ return;
+ default:
+ throw new java.lang.AssertionError("Can't get here.");
+ }
+ }
+
+ public final com.google.protobuf.Message
+ getRequestPrototype(
+ com.google.protobuf.Descriptors.MethodDescriptor method) {
+ if (method.getService() != getDescriptor()) {
+ throw new java.lang.IllegalArgumentException(
+ "Service.getRequestPrototype() given method " +
+ "descriptor for wrong service type.");
+ }
+ switch(method.getIndex()) {
+ case 0:
+ return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto.getDefaultInstance();
+ default:
+ throw new java.lang.AssertionError("Can't get here.");
+ }
+ }
+
+ public final com.google.protobuf.Message
+ getResponsePrototype(
+ com.google.protobuf.Descriptors.MethodDescriptor method) {
+ if (method.getService() != getDescriptor()) {
+ throw new java.lang.IllegalArgumentException(
+ "Service.getResponsePrototype() given method " +
+ "descriptor for wrong service type.");
+ }
+ switch(method.getIndex()) {
+ case 0:
+ return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto.getDefaultInstance();
+ default:
+ throw new java.lang.AssertionError("Can't get here.");
+ }
+ }
+
+ public static Stub newStub(
+ com.google.protobuf.RpcChannel channel) {
+ return new Stub(channel);
+ }
+
+ public static final class Stub extends org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.LlapManagementProtocol implements Interface {
+ private Stub(com.google.protobuf.RpcChannel channel) {
+ this.channel = channel;
+ }
+
+ private final com.google.protobuf.RpcChannel channel;
+
+ public com.google.protobuf.RpcChannel getChannel() {
+ return channel;
+ }
+
+ public void getDelegationToken(
+ com.google.protobuf.RpcController controller,
+ org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto request,
+ com.google.protobuf.RpcCallback<org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto> done) {
+ channel.callMethod(
+ getDescriptor().getMethods().get(0),
+ controller,
+ request,
+ org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto.getDefaultInstance(),
+ com.google.protobuf.RpcUtil.generalizeCallback(
+ done,
+ org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto.class,
+ org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto.getDefaultInstance()));
+ }
+ }
+
+ public static BlockingInterface newBlockingStub(
+ com.google.protobuf.BlockingRpcChannel channel) {
+ return new BlockingStub(channel);
+ }
+
+ public interface BlockingInterface {
+ public org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto getDelegationToken(
+ com.google.protobuf.RpcController controller,
+ org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto request)
+ throws com.google.protobuf.ServiceException;
+ }
+
+ private static final class BlockingStub implements BlockingInterface {
+ private BlockingStub(com.google.protobuf.BlockingRpcChannel channel) {
+ this.channel = channel;
+ }
+
+ private final com.google.protobuf.BlockingRpcChannel channel;
+
+ public org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto getDelegationToken(
+ com.google.protobuf.RpcController controller,
+ org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto request)
+ throws com.google.protobuf.ServiceException {
+ return (org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto) channel.callBlockingMethod(
+ getDescriptor().getMethods().get(0),
+ controller,
+ request,
+ org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto.getDefaultInstance());
+ }
+
+ }
+
+ // @@protoc_insertion_point(class_scope:LlapManagementProtocol)
+ }
+
private static com.google.protobuf.Descriptors.Descriptor
internal_static_UserPayloadProto_descriptor;
private static
@@ -12506,6 +13513,16 @@ public final class LlapDaemonProtocolProtos {
private static
com.google.protobuf.GeneratedMessage.FieldAccessorTable
internal_static_TerminateFragmentResponseProto_fieldAccessorTable;
+ private static com.google.protobuf.Descriptors.Descriptor
+ internal_static_GetTokenRequestProto_descriptor;
+ private static
+ com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internal_static_GetTokenRequestProto_fieldAccessorTable;
+ private static com.google.protobuf.Descriptors.Descriptor
+ internal_static_GetTokenResponseProto_descriptor;
+ private static
+ com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internal_static_GetTokenResponseProto_fieldAccessorTable;
public static com.google.protobuf.Descriptors.FileDescriptor
getDescriptor() {
@@ -12558,19 +13575,23 @@ public final class LlapDaemonProtocolProtos {
"mpleteResponseProto\"g\n\035TerminateFragment" +
"RequestProto\022\020\n\010query_id\030\001 \001(\t\022\020\n\010dag_na" +
"me\030\002 \001(\t\022\"\n\032fragment_identifier_string\030\007" +
- " \001(\t\" \n\036TerminateFragmentResponseProto*2" +
- "\n\020SourceStateProto\022\017\n\013S_SUCCEEDED\020\001\022\r\n\tS" +
- "_RUNNING\020\0022\316\002\n\022LlapDaemonProtocol\022?\n\nsub" +
- "mitWork\022\027.SubmitWorkRequestProto\032\030.Submi" +
- "tWorkResponseProto\022W\n\022sourceStateUpdated" +
- "\022\037.SourceStateUpdatedRequestProto\032 .Sour" +
- "ceStateUpdatedResponseProto\022H\n\rqueryComp",
- "lete\022\032.QueryCompleteRequestProto\032\033.Query" +
- "CompleteResponseProto\022T\n\021terminateFragme" +
- "nt\022\036.TerminateFragmentRequestProto\032\037.Ter" +
- "minateFragmentResponseProtoBH\n&org.apach" +
- "e.hadoop.hive.llap.daemon.rpcB\030LlapDaemo" +
- "nProtocolProtos\210\001\001\240\001\001"
+ " \001(\t\" \n\036TerminateFragmentResponseProto\"\026" +
+ "\n\024GetTokenRequestProto\"&\n\025GetTokenRespon" +
+ "seProto\022\r\n\005token\030\001 \001(\014*2\n\020SourceStatePro" +
+ "to\022\017\n\013S_SUCCEEDED\020\001\022\r\n\tS_RUNNING\020\0022\316\002\n\022L" +
+ "lapDaemonProtocol\022?\n\nsubmitWork\022\027.Submit" +
+ "WorkRequestProto\032\030.SubmitWorkResponsePro" +
+ "to\022W\n\022sourceStateUpdated\022\037.SourceStateUp",
+ "datedRequestProto\032 .SourceStateUpdatedRe" +
+ "sponseProto\022H\n\rqueryComplete\022\032.QueryComp" +
+ "leteRequestProto\032\033.QueryCompleteResponse" +
+ "Proto\022T\n\021terminateFragment\022\036.TerminateFr" +
+ "agmentRequestProto\032\037.TerminateFragmentRe" +
+ "sponseProto2]\n\026LlapManagementProtocol\022C\n" +
+ "\022getDelegationToken\022\025.GetTokenRequestPro" +
+ "to\032\026.GetTokenResponseProtoBH\n&org.apache" +
+ ".hadoop.hive.llap.daemon.rpcB\030LlapDaemon" +
+ "ProtocolProtos\210\001\001\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -12661,6 +13682,18 @@ public final class LlapDaemonProtocolProtos {
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_TerminateFragmentResponseProto_descriptor,
new java.lang.String[] { });
+ internal_static_GetTokenRequestProto_descriptor =
+ getDescriptor().getMessageTypes().get(14);
+ internal_static_GetTokenRequestProto_fieldAccessorTable = new
+ com.google.protobuf.GeneratedMessage.FieldAccessorTable(
+ internal_static_GetTokenRequestProto_descriptor,
+ new java.lang.String[] { });
+ internal_static_GetTokenResponseProto_descriptor =
+ getDescriptor().getMessageTypes().get(15);
+ internal_static_GetTokenResponseProto_fieldAccessorTable = new
+ com.google.protobuf.GeneratedMessage.FieldAccessorTable(
+ internal_static_GetTokenResponseProto_descriptor,
+ new java.lang.String[] { "Token", });
return null;
}
};
http://git-wip-us.apache.org/repos/asf/hive/blob/915587b8/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/LlapDaemonProtocolBlockingPB.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/LlapDaemonProtocolBlockingPB.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/LlapDaemonProtocolBlockingPB.java
index 5ad2344..4c09941 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/LlapDaemonProtocolBlockingPB.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/LlapDaemonProtocolBlockingPB.java
@@ -15,8 +15,14 @@
package org.apache.hadoop.hive.llap.daemon;
import org.apache.hadoop.ipc.ProtocolInfo;
+import org.apache.hadoop.security.KerberosInfo;
+import org.apache.hadoop.security.token.TokenInfo;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
+import org.apache.hadoop.hive.llap.security.LlapTokenSelector;
@ProtocolInfo(protocolName = "org.apache.hadoop.hive.llap.daemon.LlapDaemonProtocolBlockingPB", protocolVersion = 1)
+@KerberosInfo(serverPrincipal = HiveConf.HIVE_LLAP_DAEMON_SERVICE_PRINCIPAL_NAME)
+@TokenInfo(LlapTokenSelector.class)
public interface LlapDaemonProtocolBlockingPB extends LlapDaemonProtocolProtos.LlapDaemonProtocol.BlockingInterface {
}
http://git-wip-us.apache.org/repos/asf/hive/blob/915587b8/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/LlapManagementProtocolBlockingPB.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/LlapManagementProtocolBlockingPB.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/LlapManagementProtocolBlockingPB.java
new file mode 100644
index 0000000..4efadac
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/LlapManagementProtocolBlockingPB.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.daemon;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
+import org.apache.hadoop.ipc.ProtocolInfo;
+import org.apache.hadoop.security.KerberosInfo;
+
+@ProtocolInfo(protocolName = "org.apache.hadoop.hive.llap.daemon.LlapManagementProtocolBlockingPB", protocolVersion = 1)
+@KerberosInfo(serverPrincipal = HiveConf.HIVE_LLAP_DAEMON_SERVICE_PRINCIPAL_NAME)
+public interface LlapManagementProtocolBlockingPB extends LlapDaemonProtocolProtos.LlapManagementProtocol.BlockingInterface {
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/915587b8/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
index dbdf571..7ce8ba0 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
@@ -38,7 +38,7 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceSta
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto;
import org.apache.hadoop.hive.llap.daemon.services.impl.LlapWebServices;
-import org.apache.hadoop.hive.llap.io.api.LlapIoProxy;
+import org.apache.hadoop.hive.llap.io.api.LlapProxy;
import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics;
import org.apache.hadoop.hive.llap.metrics.LlapMetricsSystem;
import org.apache.hadoop.hive.llap.metrics.MetricsUtils;
@@ -80,19 +80,22 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
private final String[] localDirs;
// TODO Not the best way to share the address
- private final AtomicReference<InetSocketAddress> address = new AtomicReference<>();
+ private final AtomicReference<InetSocketAddress> srvAddress = new AtomicReference<>(),
+ mngAddress = new AtomicReference<>();
private final AtomicReference<Integer> shufflePort = new AtomicReference<>();
public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemoryBytes,
- boolean ioEnabled, boolean isDirectCache, long ioMemoryBytes, String[] localDirs, int rpcPort,
- int shufflePort) {
+ boolean ioEnabled, boolean isDirectCache, long ioMemoryBytes, String[] localDirs, int srvPort,
+ int mngPort, int shufflePort) {
super("LlapDaemon");
printAsciiArt();
Preconditions.checkArgument(numExecutors > 0);
- Preconditions.checkArgument(rpcPort == 0 || (rpcPort > 1024 && rpcPort < 65536),
- "RPC Port must be between 1025 and 65535, or 0 automatic selection");
+ Preconditions.checkArgument(srvPort == 0 || (srvPort > 1024 && srvPort < 65536),
+ "Server RPC Port must be between 1025 and 65535, or 0 automatic selection");
+ Preconditions.checkArgument(mngPort == 0 || (mngPort > 1024 && mngPort < 65536),
+ "Management RPC Port must be between 1025 and 65535, or 0 automatic selection");
Preconditions.checkArgument(localDirs != null && localDirs.length > 0,
"Work dirs must be specified");
Preconditions.checkArgument(shufflePort == 0 || (shufflePort > 1024 && shufflePort < 65536),
@@ -111,7 +114,8 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
daemonConf, ConfVars.LLAP_DAEMON_TASK_SCHEDULER_ENABLE_PREEMPTION);
LOG.info("Attempting to start LlapDaemonConf with the following configuration: " +
"numExecutors=" + numExecutors +
- ", rpcListenerPort=" + rpcPort +
+ ", rpcListenerPort=" + srvPort +
+ ", mngListenerPort=" + mngPort +
", workDirs=" + Arrays.toString(localDirs) +
", shufflePort=" + shufflePort +
", executorMemory=" + executorMemoryBytes +
@@ -154,11 +158,10 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
" sessionId: " + sessionId);
- this.amReporter = new AMReporter(address, new QueryFailedHandlerProxy(), daemonConf);
-
-
- this.server = new LlapDaemonProtocolServerImpl(numHandlers, this, address, rpcPort);
+ this.amReporter = new AMReporter(srvAddress, new QueryFailedHandlerProxy(), daemonConf);
+ this.server = new LlapDaemonProtocolServerImpl(
+ numHandlers, this, srvAddress, mngAddress, srvPort, mngPort);
this.containerRunner = new ContainerRunnerImpl(daemonConf,
numExecutors,
@@ -166,7 +169,7 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
enablePreemption,
localDirs,
this.shufflePort,
- address,
+ srvAddress,
executorMemoryBytes,
metrics,
amReporter);
@@ -227,8 +230,8 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
@Override
public void serviceInit(Configuration conf) throws Exception {
super.serviceInit(conf);
- LlapIoProxy.setDaemon(true);
- LlapIoProxy.initializeLlapIo(conf);
+ LlapProxy.setDaemon(true);
+ LlapProxy.initializeLlapIo(conf);
}
@Override
@@ -262,7 +265,7 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
LlapMetricsSystem.shutdown();
}
- LlapIoProxy.close();
+ LlapProxy.close();
}
public static void main(String[] args) throws Exception {
@@ -276,6 +279,7 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
String[] localDirs = daemonConf.getTrimmedStrings(ConfVars.LLAP_DAEMON_WORK_DIRS.varname);
int rpcPort = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_DAEMON_RPC_PORT);
+ int mngPort = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_MANAGEMENT_RPC_PORT);
int shufflePort = daemonConf
.getInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, ShuffleHandler.DEFAULT_SHUFFLE_PORT);
long executorMemoryBytes = HiveConf.getIntVar(
@@ -287,8 +291,7 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
boolean llapIoEnabled = HiveConf.getBoolVar(daemonConf, HiveConf.ConfVars.LLAP_IO_ENABLED);
llapDaemon =
new LlapDaemon(daemonConf, numExecutors, executorMemoryBytes, llapIoEnabled, isDirectCache,
- cacheMemoryBytes, localDirs,
- rpcPort, shufflePort);
+ cacheMemoryBytes, localDirs, rpcPort, mngPort, shufflePort);
LOG.info("Adding shutdown hook for LlapDaemon");
ShutdownHookManager.addShutdownHook(new CompositeServiceShutdownHook(llapDaemon), 1);
http://git-wip-us.apache.org/repos/asf/hive/blob/915587b8/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolClientImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolClientImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolClientImpl.java
index 4b13277..9c7d2e2 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolClientImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolClientImpl.java
@@ -115,7 +115,6 @@ public class LlapDaemonProtocolClientImpl implements LlapDaemonProtocolBlockingP
}
public LlapDaemonProtocolBlockingPB createProxy() throws IOException {
- // TODO Fix security
RPC.setProtocolEngine(conf, LlapDaemonProtocolBlockingPB.class, ProtobufRpcEngine.class);
ProtocolProxy<LlapDaemonProtocolBlockingPB> proxy =
RPC.getProtocolProxy(LlapDaemonProtocolBlockingPB.class, 0, serverAddr,