You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sp...@apache.org on 2016/05/06 20:42:43 UTC
[12/50] [abbrv] hive git commit: Merge branch 'master' into llap
Merge branch 'master' into llap
Conflicts:
llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/e0579097
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/e0579097
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/e0579097
Branch: refs/heads/java8
Commit: e057909732b40b581fcad3f61fb798600f01ecdf
Parents: 4847f65 8729966
Author: Jason Dere <jd...@hortonworks.com>
Authored: Wed May 4 00:17:12 2016 -0700
Committer: Jason Dere <jd...@hortonworks.com>
Committed: Wed May 4 00:17:12 2016 -0700
----------------------------------------------------------------------
HIVE-13509.2.patch | 478 --
.../ext/LlapTaskUmbilicalExternalClient.java | 18 +-
.../daemon/rpc/LlapDaemonProtocolProtos.java | 7000 +++++++++++-------
.../org/apache/hadoop/hive/llap/DaemonId.java | 41 +
.../hive/llap/security/LlapTokenIdentifier.java | 39 +-
.../hive/llap/security/LlapTokenProvider.java | 2 +-
.../apache/hadoop/hive/llap/tez/Converters.java | 84 +-
.../src/protobuf/LlapDaemonProtocol.proto | 70 +-
.../hadoop/hive/llap/tez/TestConverters.java | 51 +-
.../hadoop/hive/llap/LlapBaseInputFormat.java | 32 +-
.../hive/llap/daemon/ContainerRunner.java | 9 +-
.../llap/daemon/impl/ContainerRunnerImpl.java | 135 +-
.../hive/llap/daemon/impl/LlapDaemon.java | 52 +-
.../daemon/impl/LlapProtocolServerImpl.java | 41 +-
.../hive/llap/daemon/impl/LlapTokenChecker.java | 137 +
.../llap/daemon/impl/QueryFragmentInfo.java | 23 +-
.../hadoop/hive/llap/daemon/impl/QueryInfo.java | 26 +-
.../hive/llap/daemon/impl/QueryTracker.java | 97 +-
.../hadoop/hive/llap/daemon/impl/Scheduler.java | 2 +
.../llap/daemon/impl/TaskExecutorService.java | 17 +-
.../llap/daemon/impl/TaskRunnerCallable.java | 77 +-
.../hive/llap/security/LlapSecurityHelper.java | 15 +-
.../hive/llap/security/SecretManager.java | 19 +-
.../hive/llap/daemon/MiniLlapCluster.java | 2 +-
.../daemon/impl/TaskExecutorTestHelpers.java | 44 +-
.../impl/TestLlapDaemonProtocolServerImpl.java | 2 +-
.../llap/daemon/impl/TestLlapTokenChecker.java | 96 +
.../TestFirstInFirstOutComparator.java | 27 +-
.../llap/tezplugins/LlapTaskCommunicator.java | 31 +-
.../hadoop/hive/metastore/txn/TxnHandler.java | 5 +
.../hive/ql/exec/tez/TezSessionState.java | 3 +-
.../hadoop/hive/ql/lockmgr/DbTxnManager.java | 12 +
.../hadoop/hive/ql/parse/SemanticAnalyzer.java | 1 -
.../ql/parse/UpdateDeleteSemanticAnalyzer.java | 16 +-
.../apache/hadoop/hive/ql/TestTxnCommands2.java | 22 +
.../hive/ql/lockmgr/TestDbTxnManager2.java | 114 +
.../dynpart_sort_optimization_acid.q.out | 120 +-
37 files changed, 5479 insertions(+), 3481 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/e0579097/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
----------------------------------------------------------------------
diff --cc llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
index fe2fd7c,0000000..6e2c85d
mode 100644,000000..100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
@@@ -1,413 -1,0 +1,421 @@@
+package org.apache.hadoop.hive.llap.ext;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.collections4.ListUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
++import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
++import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.VertexIdentifier;
+import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol;
++import org.apache.hadoop.hive.llap.tez.Converters;
+import org.apache.hadoop.hive.llap.tez.LlapProtocolClientProxy;
+import org.apache.hadoop.hive.llap.tezplugins.helpers.LlapTaskUmbilicalServer;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.ProtocolSignature;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tez.common.security.JobTokenIdentifier;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.impl.EventType;
+import org.apache.tez.runtime.api.impl.TezEvent;
+import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
+import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class LlapTaskUmbilicalExternalClient extends AbstractService {
+
+ private static final Logger LOG = LoggerFactory.getLogger(LlapTaskUmbilicalExternalClient.class);
+
+ private final LlapProtocolClientProxy communicator;
+ private volatile LlapTaskUmbilicalServer llapTaskUmbilicalServer;
+ private final Configuration conf;
+ private final LlapTaskUmbilicalProtocol umbilical;
+
+ protected final String tokenIdentifier;
+ protected final Token<JobTokenIdentifier> sessionToken;
+
+ private final ConcurrentMap<String, PendingEventData> pendingEvents = new ConcurrentHashMap<>();
+ private final ConcurrentMap<String, TaskHeartbeatInfo> registeredTasks= new ConcurrentHashMap<String, TaskHeartbeatInfo>();
+ private LlapTaskUmbilicalExternalResponder responder = null;
+ private final ScheduledThreadPoolExecutor timer;
+ private final long connectionTimeout;
+
+ private static class TaskHeartbeatInfo {
+ final String taskAttemptId;
+ final String hostname;
+ final int port;
+ final AtomicLong lastHeartbeat = new AtomicLong();
+
+ public TaskHeartbeatInfo(String taskAttemptId, String hostname, int port) {
+ this.taskAttemptId = taskAttemptId;
+ this.hostname = hostname;
+ this.port = port;
+ this.lastHeartbeat.set(System.currentTimeMillis());
+ }
+ }
+
+ private static class PendingEventData {
+ final TaskHeartbeatInfo heartbeatInfo;
+ final List<TezEvent> tezEvents;
+
+ public PendingEventData(TaskHeartbeatInfo heartbeatInfo, List<TezEvent> tezEvents) {
+ this.heartbeatInfo = heartbeatInfo;
+ this.tezEvents = tezEvents;
+ }
+ }
+
+ // TODO KKK Work out the details of the tokenIdentifier, and the session token.
+ // It may just be possible to create one here - since Shuffle is not involved, and this is only used
+ // for communication from LLAP-Daemons to the server. It will need to be sent in as part
+ // of the job submission request.
+ public LlapTaskUmbilicalExternalClient(Configuration conf, String tokenIdentifier,
+ Token<JobTokenIdentifier> sessionToken, LlapTaskUmbilicalExternalResponder responder) {
+ super(LlapTaskUmbilicalExternalClient.class.getName());
+ this.conf = conf;
+ this.umbilical = new LlapTaskUmbilicalExternalImpl();
+ this.tokenIdentifier = tokenIdentifier;
+ this.sessionToken = sessionToken;
+ this.responder = responder;
+ this.timer = new ScheduledThreadPoolExecutor(1);
+ this.connectionTimeout = HiveConf.getTimeVar(conf,
+ HiveConf.ConfVars.LLAP_DAEMON_AM_LIVENESS_CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ // TODO. No support for the LLAP token yet. Add support for configurable threads, however 1 should always be enough.
+ this.communicator = new LlapProtocolClientProxy(1, conf, null);
+ this.communicator.init(conf);
+ }
+
+ @Override
+ public void serviceStart() throws IOException {
+ // If we use a single server for multiple external clients, then consider using more than one handler.
+ int numHandlers = 1;
+ llapTaskUmbilicalServer = new LlapTaskUmbilicalServer(conf, umbilical, numHandlers, tokenIdentifier, sessionToken);
+ communicator.start();
+ }
+
+ @Override
+ public void serviceStop() {
+ llapTaskUmbilicalServer.shutdownServer();
+ timer.shutdown();
+ if (this.communicator != null) {
+ this.communicator.stop();
+ }
+ }
+
+ public InetSocketAddress getAddress() {
+ return llapTaskUmbilicalServer.getAddress();
+ }
+
+
+ /**
+ * Submit the work for actual execution.
+ * @param submitWorkRequestProto
+ */
+ public void submitWork(final SubmitWorkRequestProto submitWorkRequestProto, String llapHost, int llapPort, List<TezEvent> tezEvents) {
+ // Register the pending events to be sent for this spec.
- String fragmentId = submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString();
++ SignableVertexSpec vertex = submitWorkRequestProto.getWorkSpec().getVertex();
++ VertexIdentifier vId = vertex.getVertexIdentifier();
++ TezTaskAttemptID attemptId = Converters.createTaskAttemptId(
++ vId, submitWorkRequestProto.getFragmentNumber(), submitWorkRequestProto.getAttemptNumber());
++ final String fragmentId = attemptId.toString();
++
+ PendingEventData pendingEventData = new PendingEventData(
+ new TaskHeartbeatInfo(fragmentId, llapHost, llapPort),
+ tezEvents);
+ pendingEvents.putIfAbsent(fragmentId, pendingEventData);
+
+ // Setup timer task to check for hearbeat timeouts
+ timer.scheduleAtFixedRate(new HeartbeatCheckTask(),
+ connectionTimeout, connectionTimeout, TimeUnit.MILLISECONDS);
+
+ // Send out the actual SubmitWorkRequest
+ communicator.sendSubmitWork(submitWorkRequestProto, llapHost, llapPort,
+ new LlapProtocolClientProxy.ExecuteRequestCallback<LlapDaemonProtocolProtos.SubmitWorkResponseProto>() {
+
+ @Override
+ public void setResponse(LlapDaemonProtocolProtos.SubmitWorkResponseProto response) {
+ if (response.hasSubmissionState()) {
+ if (response.getSubmissionState().equals(LlapDaemonProtocolProtos.SubmissionStateProto.REJECTED)) {
- String msg = "Fragment: " + submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString() + " rejected. Server Busy.";
++ String msg = "Fragment: " + fragmentId + " rejected. Server Busy.";
+ LOG.info(msg);
+ if (responder != null) {
+ Throwable err = new RuntimeException(msg);
- responder.submissionFailed(submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString(), err);
++ responder.submissionFailed(fragmentId, err);
+ }
+ return;
+ }
+ }
+ }
+
+ @Override
+ public void indicateError(Throwable t) {
- String msg = "Failed to submit: " + submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString();
++ String msg = "Failed to submit: " + fragmentId;
+ LOG.error(msg, t);
+ Throwable err = new RuntimeException(msg, t);
- responder.submissionFailed(submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString(), err);
++ responder.submissionFailed(fragmentId, err);
+ }
+ });
+
+
+
+
+
+// // TODO Also send out information saying that the fragment is finishable - if that is not already included in the main fragment.
+// // This entire call is only required if we're doing more than scans. MRInput has no dependencies and is always finishable
+// QueryIdentifierProto queryIdentifier = QueryIdentifierProto
+// .newBuilder()
+// .setAppIdentifier(submitWorkRequestProto.getApplicationIdString()).setDagIdentifier(submitWorkRequestProto.getFragmentSpec().getDagId())
+// .build();
+// LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto sourceStateUpdatedRequest =
+// LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto.newBuilder().setQueryIdentifier(queryIdentifier).setState(
+// LlapDaemonProtocolProtos.SourceStateProto.S_SUCCEEDED).
+// setSrcName(TODO)
+// communicator.sendSourceStateUpdate(LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto.newBuilder().setQueryIdentifier(submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString()).set);
+
+
+ }
+
+ private void updateHeartbeatInfo(String taskAttemptId) {
+ int updateCount = 0;
+
+ PendingEventData pendingEventData = pendingEvents.get(taskAttemptId);
+ if (pendingEventData != null) {
+ pendingEventData.heartbeatInfo.lastHeartbeat.set(System.currentTimeMillis());
+ updateCount++;
+ }
+
+ TaskHeartbeatInfo heartbeatInfo = registeredTasks.get(taskAttemptId);
+ if (heartbeatInfo != null) {
+ heartbeatInfo.lastHeartbeat.set(System.currentTimeMillis());
+ updateCount++;
+ }
+
+ if (updateCount == 0) {
+ LOG.warn("No tasks found for heartbeat from taskAttemptId " + taskAttemptId);
+ }
+ }
+
+ private void updateHeartbeatInfo(String hostname, int port) {
+ int updateCount = 0;
+
+ for (String key : pendingEvents.keySet()) {
+ PendingEventData pendingEventData = pendingEvents.get(key);
+ if (pendingEventData != null) {
+ if (pendingEventData.heartbeatInfo.hostname.equals(hostname)
+ && pendingEventData.heartbeatInfo.port == port) {
+ pendingEventData.heartbeatInfo.lastHeartbeat.set(System.currentTimeMillis());
+ updateCount++;
+ }
+ }
+ }
+
+ for (String key : registeredTasks.keySet()) {
+ TaskHeartbeatInfo heartbeatInfo = registeredTasks.get(key);
+ if (heartbeatInfo != null) {
+ if (heartbeatInfo.hostname.equals(hostname)
+ && heartbeatInfo.port == port) {
+ heartbeatInfo.lastHeartbeat.set(System.currentTimeMillis());
+ updateCount++;
+ }
+ }
+ }
+
+ if (updateCount == 0) {
+ LOG.info("No tasks found for heartbeat from hostname " + hostname + ", port " + port);
+ }
+ }
+
+ private class HeartbeatCheckTask implements Runnable {
+ public void run() {
+ long currentTime = System.currentTimeMillis();
+ List<String> timedOutTasks = new ArrayList<String>();
+
+ // Check both pending and registered tasks for timeouts
+ for (String key : pendingEvents.keySet()) {
+ PendingEventData pendingEventData = pendingEvents.get(key);
+ if (pendingEventData != null) {
+ if (currentTime - pendingEventData.heartbeatInfo.lastHeartbeat.get() >= connectionTimeout) {
+ timedOutTasks.add(key);
+ }
+ }
+ }
+ for (String timedOutTask : timedOutTasks) {
+ LOG.info("Pending taskAttemptId " + timedOutTask + " timed out");
+ responder.heartbeatTimeout(timedOutTask);
+ pendingEvents.remove(timedOutTask);
+ // TODO: Do we need to tell the LLAP daemon we are no longer interested in this task?
+ }
+
+ timedOutTasks.clear();
+ for (String key : registeredTasks.keySet()) {
+ TaskHeartbeatInfo heartbeatInfo = registeredTasks.get(key);
+ if (heartbeatInfo != null) {
+ if (currentTime - heartbeatInfo.lastHeartbeat.get() >= connectionTimeout) {
+ timedOutTasks.add(key);
+ }
+ }
+ }
+ for (String timedOutTask : timedOutTasks) {
+ LOG.info("Running taskAttemptId " + timedOutTask + " timed out");
+ responder.heartbeatTimeout(timedOutTask);
+ registeredTasks.remove(timedOutTask);
+ // TODO: Do we need to tell the LLAP daemon we are no longer interested in this task?
+ }
+ }
+ }
+
+ public interface LlapTaskUmbilicalExternalResponder {
+ void submissionFailed(String fragmentId, Throwable throwable);
+ void heartbeat(TezHeartbeatRequest request);
+ void taskKilled(TezTaskAttemptID taskAttemptId);
+ void heartbeatTimeout(String fragmentId);
+ }
+
+
+
+ // TODO Ideally, the server should be shared across all client sessions running on the same node.
+ private class LlapTaskUmbilicalExternalImpl implements LlapTaskUmbilicalProtocol {
+
+ @Override
+ public boolean canCommit(TezTaskAttemptID taskid) throws IOException {
+ // Expecting only a single instance of a task to be running.
+ return true;
+ }
+
+ @Override
+ public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request) throws IOException,
+ TezException {
+ // Keep-alive information. The client should be informed and will have to take care of re-submitting the work.
+ // Some parts of fault tolerance go here.
+
+ // This also provides completion information, and a possible notification when task actually starts running (first heartbeat)
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Received heartbeat from container, request=" + request);
+ }
+
+ // Incoming events can be ignored until the point when shuffle needs to be handled, instead of just scans.
+ TezHeartbeatResponse response = new TezHeartbeatResponse();
+
+ response.setLastRequestId(request.getRequestId());
+ // Assuming TaskAttemptId and FragmentIdentifierString are the same. Verify this.
+ TezTaskAttemptID taskAttemptId = request.getCurrentTaskAttemptID();
+ String taskAttemptIdString = taskAttemptId.toString();
+
+ updateHeartbeatInfo(taskAttemptIdString);
+
+ List<TezEvent> tezEvents = null;
+ PendingEventData pendingEventData = pendingEvents.remove(taskAttemptIdString);
+ if (pendingEventData == null) {
+ tezEvents = Collections.emptyList();
+
+ // If this heartbeat was not from a pending event and it's not in our list of registered tasks,
+ if (!registeredTasks.containsKey(taskAttemptIdString)) {
+ LOG.info("Unexpected heartbeat from " + taskAttemptIdString);
+ response.setShouldDie(); // Do any of the other fields need to be set?
+ return response;
+ }
+ } else {
+ tezEvents = pendingEventData.tezEvents;
+ // Tasks removed from the pending list should then be added to the registered list.
+ registeredTasks.put(taskAttemptIdString, pendingEventData.heartbeatInfo);
+ }
+
+ response.setLastRequestId(request.getRequestId());
+ // Irrelevant from eventIds. This can be tracked in the AM itself, instead of polluting the task.
+ // Also since we have all the MRInput events here - they'll all be sent in together.
+ response.setNextFromEventId(0); // Irrelevant. See comment above.
+ response.setNextPreRoutedEventId(0); //Irrelevant. See comment above.
+ response.setEvents(tezEvents);
+
+ List<TezEvent> inEvents = request.getEvents();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Heartbeat from " + taskAttemptIdString +
+ " events: " + (inEvents != null ? inEvents.size() : -1));
+ }
+ for (TezEvent tezEvent : ListUtils.emptyIfNull(inEvents)) {
+ EventType eventType = tezEvent.getEventType();
+ switch (eventType) {
+ case TASK_ATTEMPT_COMPLETED_EVENT:
+ LOG.debug("Task completed event for " + taskAttemptIdString);
+ registeredTasks.remove(taskAttemptIdString);
+ break;
+ case TASK_ATTEMPT_FAILED_EVENT:
+ LOG.debug("Task failed event for " + taskAttemptIdString);
+ registeredTasks.remove(taskAttemptIdString);
+ break;
+ case TASK_STATUS_UPDATE_EVENT:
+ // If we want to handle counters
+ LOG.debug("Task update event for " + taskAttemptIdString);
+ break;
+ default:
+ LOG.warn("Unhandled event type " + eventType);
+ break;
+ }
+ }
+
+ // Pass the request on to the responder
+ try {
+ if (responder != null) {
+ responder.heartbeat(request);
+ }
+ } catch (Exception err) {
+ LOG.error("Error during responder execution", err);
+ }
+
+ return response;
+ }
+
+ @Override
+ public void nodeHeartbeat(Text hostname, int port) throws IOException {
+ updateHeartbeatInfo(hostname.toString(), port);
+ // No need to propagate to this to the responder
+ }
+
+ @Override
+ public void taskKilled(TezTaskAttemptID taskAttemptId) throws IOException {
+ String taskAttemptIdString = taskAttemptId.toString();
+ LOG.error("Task killed - " + taskAttemptIdString);
+ registeredTasks.remove(taskAttemptIdString);
+
+ try {
+ if (responder != null) {
+ responder.taskKilled(taskAttemptId);
+ }
+ } catch (Exception err) {
+ LOG.error("Error during responder execution", err);
+ }
+ }
+
+ @Override
+ public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
+ return 0;
+ }
+
+ @Override
+ public ProtocolSignature getProtocolSignature(String protocol, long clientVersion,
+ int clientMethodsHash) throws IOException {
+ return ProtocolSignature.getProtocolSignature(this, protocol,
+ clientVersion, clientMethodsHash);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/e0579097/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
----------------------------------------------------------------------
diff --cc llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
index 8db2f88,0000000..988002f
mode 100644,000000..100644
--- a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
+++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
@@@ -1,476 -1,0 +1,480 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import java.sql.SQLException;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.sql.DriverManager;
+
+import java.io.IOException;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.DataInputStream;
+import java.io.ByteArrayInputStream;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+
+import org.apache.commons.collections4.ListUtils;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.LlapBaseRecordReader;
+import org.apache.hadoop.hive.llap.LlapBaseRecordReader.ReaderEvent;
+import org.apache.hadoop.hive.llap.LlapInputSplit;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo;
++import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
++import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.VertexOrBinary;
+import org.apache.hadoop.hive.llap.ext.LlapTaskUmbilicalExternalClient;
+import org.apache.hadoop.hive.llap.ext.LlapTaskUmbilicalExternalClient.LlapTaskUmbilicalExternalResponder;
+import org.apache.hadoop.hive.llap.registry.ServiceInstance;
+import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet;
+import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
+import org.apache.hadoop.hive.llap.tez.Converters;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.InputSplitWithLocationInfo;
+import org.apache.hadoop.mapred.SplitLocationInfo;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+import org.apache.tez.common.security.JobTokenIdentifier;
+import org.apache.tez.common.security.TokenCache;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
+import org.apache.tez.runtime.api.impl.EventType;
+import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.apache.tez.runtime.api.impl.TezEvent;
+import org.apache.tez.runtime.api.impl.TezEvent;
+import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
+import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.protobuf.ByteString;
+
+
+public class LlapBaseInputFormat<V extends WritableComparable> implements InputFormat<NullWritable, V> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(LlapBaseInputFormat.class);
+
+ private static String driverName = "org.apache.hive.jdbc.HiveDriver";
+ private String url; // "jdbc:hive2://localhost:10000/default"
+ private String user; // "hive",
+ private String pwd; // ""
+ private String query;
+
+ public static final String URL_KEY = "llap.if.hs2.connection";
+ public static final String QUERY_KEY = "llap.if.query";
+ public static final String USER_KEY = "llap.if.user";
+ public static final String PWD_KEY = "llap.if.pwd";
+
+ public final String SPLIT_QUERY = "select get_splits(\"%s\",%d)";
+
+ private Connection con;
+ private Statement stmt;
+
+ public LlapBaseInputFormat(String url, String user, String pwd, String query) {
+ this.url = url;
+ this.user = user;
+ this.pwd = pwd;
+ this.query = query;
+ }
+
+ public LlapBaseInputFormat() {}
+
+
+ @Override
+ public RecordReader<NullWritable, V> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
+
+ LlapInputSplit llapSplit = (LlapInputSplit) split;
+
+ // Set conf to use LLAP user rather than current user for LLAP Zk registry.
+ HiveConf.setVar(job, HiveConf.ConfVars.LLAP_ZK_REGISTRY_USER, llapSplit.getLlapUser());
+ SubmitWorkInfo submitWorkInfo = SubmitWorkInfo.fromBytes(llapSplit.getPlanBytes());
+
+ ServiceInstance serviceInstance = getServiceInstance(job, llapSplit);
+ String host = serviceInstance.getHost();
+ int llapSubmitPort = serviceInstance.getRpcPort();
+
+ LOG.info("Found service instance for host " + host + " with rpc port " + llapSubmitPort
+ + " and outputformat port " + serviceInstance.getOutputFormatPort());
+
+ LlapRecordReaderTaskUmbilicalExternalResponder umbilicalResponder =
+ new LlapRecordReaderTaskUmbilicalExternalResponder();
+ LlapTaskUmbilicalExternalClient llapClient =
+ new LlapTaskUmbilicalExternalClient(job, submitWorkInfo.getTokenIdentifier(),
+ submitWorkInfo.getToken(), umbilicalResponder);
+ llapClient.init(job);
+ llapClient.start();
+
+ SubmitWorkRequestProto submitWorkRequestProto =
+ constructSubmitWorkRequestProto(submitWorkInfo, llapSplit.getSplitNum(),
+ llapClient.getAddress(), submitWorkInfo.getToken());
+
+ TezEvent tezEvent = new TezEvent();
+ DataInputBuffer dib = new DataInputBuffer();
+ dib.reset(llapSplit.getFragmentBytes(), 0, llapSplit.getFragmentBytes().length);
+ tezEvent.readFields(dib);
+ List<TezEvent> tezEventList = Lists.newArrayList();
+ tezEventList.add(tezEvent);
+
+ llapClient.submitWork(submitWorkRequestProto, host, llapSubmitPort, tezEventList);
+
+ String id = HiveConf.getVar(job, HiveConf.ConfVars.HIVEQUERYID) + "_" + llapSplit.getSplitNum();
+
+ HiveConf conf = new HiveConf();
+ Socket socket = new Socket(host,
+ serviceInstance.getOutputFormatPort());
+
+ LOG.debug("Socket connected");
+
+ socket.getOutputStream().write(id.getBytes());
+ socket.getOutputStream().write(0);
+ socket.getOutputStream().flush();
+
+ LOG.info("Registered id: " + id);
+
+ LlapBaseRecordReader recordReader = new LlapBaseRecordReader(socket.getInputStream(), llapSplit.getSchema(), Text.class);
+ umbilicalResponder.setRecordReader(recordReader);
+ return recordReader;
+ }
+
+ @Override
+ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+ List<InputSplit> ins = new ArrayList<InputSplit>();
+
+ if (url == null) url = job.get(URL_KEY);
+ if (query == null) query = job.get(QUERY_KEY);
+ if (user == null) user = job.get(USER_KEY);
+ if (pwd == null) pwd = job.get(PWD_KEY);
+
+ if (url == null || query == null) {
+ throw new IllegalStateException();
+ }
+
+ try {
+ Class.forName(driverName);
+ } catch (ClassNotFoundException e) {
+ throw new IOException(e);
+ }
+
+ try {
+ con = DriverManager.getConnection(url,user,pwd);
+ stmt = con.createStatement();
+ String sql = String.format(SPLIT_QUERY, query, numSplits);
+ ResultSet res = stmt.executeQuery(sql);
+ while (res.next()) {
+ // deserialize split
+ DataInput in = new DataInputStream(res.getBinaryStream(1));
+ InputSplitWithLocationInfo is = new LlapInputSplit();
+ is.readFields(in);
+ ins.add(is);
+ }
+
+ res.close();
+ stmt.close();
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ return ins.toArray(new InputSplit[ins.size()]);
+ }
+
+ public void close() {
+ try {
+ con.close();
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+
+ private ServiceInstance getServiceInstance(JobConf job, LlapInputSplit llapSplit) throws IOException {
+ LlapRegistryService registryService = LlapRegistryService.getClient(job);
+ String host = llapSplit.getLocations()[0];
+
+ ServiceInstance serviceInstance = getServiceInstanceForHost(registryService, host);
+ if (serviceInstance == null) {
+ throw new IOException("No service instances found for " + host + " in registry");
+ }
+
+ return serviceInstance;
+ }
+
+ private ServiceInstance getServiceInstanceForHost(LlapRegistryService registryService, String host) throws IOException {
+ InetAddress address = InetAddress.getByName(host);
+ ServiceInstanceSet instanceSet = registryService.getInstances();
+ ServiceInstance serviceInstance = null;
+
+ // The name used in the service registry may not match the host name we're using.
+ // Try hostname/canonical hostname/host address
+
+ String name = address.getHostName();
+ LOG.info("Searching service instance by hostname " + name);
+ serviceInstance = selectServiceInstance(instanceSet.getByHost(name));
+ if (serviceInstance != null) {
+ return serviceInstance;
+ }
+
+ name = address.getCanonicalHostName();
+ LOG.info("Searching service instance by canonical hostname " + name);
+ serviceInstance = selectServiceInstance(instanceSet.getByHost(name));
+ if (serviceInstance != null) {
+ return serviceInstance;
+ }
+
+ name = address.getHostAddress();
+ LOG.info("Searching service instance by address " + name);
+ serviceInstance = selectServiceInstance(instanceSet.getByHost(name));
+ if (serviceInstance != null) {
+ return serviceInstance;
+ }
+
+ return serviceInstance;
+ }
+
+ private ServiceInstance selectServiceInstance(Set<ServiceInstance> serviceInstances) {
+ if (serviceInstances == null || serviceInstances.isEmpty()) {
+ return null;
+ }
+
+ // Get the first live service instance
+ for (ServiceInstance serviceInstance : serviceInstances) {
+ if (serviceInstance.isAlive()) {
+ return serviceInstance;
+ }
+ }
+
+ LOG.info("No live service instances were found");
+ return null;
+ }
+
+ private SubmitWorkRequestProto constructSubmitWorkRequestProto(SubmitWorkInfo submitWorkInfo,
+ int taskNum,
+ InetSocketAddress address,
+ Token<JobTokenIdentifier> token) throws
+ IOException {
+ TaskSpec taskSpec = submitWorkInfo.getTaskSpec();
+ ApplicationId appId = submitWorkInfo.getFakeAppId();
+
- SubmitWorkRequestProto.Builder builder = SubmitWorkRequestProto.newBuilder();
++ int attemptId = taskSpec.getTaskAttemptID().getId();
+ // This works, assuming the executor is running within YARN.
- LOG.info("Setting user in submitWorkRequest to: " +
- System.getenv(ApplicationConstants.Environment.USER.name()));
- builder.setUser(System.getenv(ApplicationConstants.Environment.USER.name()));
- builder.setApplicationIdString(appId.toString());
- builder.setAppAttemptNumber(0);
- builder.setTokenIdentifier(appId.toString());
++ String user = System.getenv(ApplicationConstants.Environment.USER.name());
++ LOG.info("Setting user in submitWorkRequest to: " + user);
++ SignableVertexSpec svs = Converters.convertTaskSpecToProto(
++ taskSpec, attemptId, appId.toString(), null, user); // TODO signatureKeyId
+
+ ContainerId containerId =
+ ContainerId.newInstance(ApplicationAttemptId.newInstance(appId, 0), taskNum);
- builder.setContainerIdString(containerId.toString());
+
- builder.setAmHost(address.getHostName());
- builder.setAmPort(address.getPort());
++
+ Credentials taskCredentials = new Credentials();
+ // Credentials can change across DAGs. Ideally construct only once per DAG.
+ // TODO Figure out where credentials will come from. Normally Hive sets up
+ // URLs on the tez dag, for which Tez acquires credentials.
+
+ // taskCredentials.addAll(getContext().getCredentials());
+
+ // Preconditions.checkState(currentQueryIdentifierProto.getDagIdentifier() ==
+ // taskSpec.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getId());
+ // ByteBuffer credentialsBinary = credentialMap.get(currentQueryIdentifierProto);
+ // if (credentialsBinary == null) {
+ // credentialsBinary = serializeCredentials(getContext().getCredentials());
+ // credentialMap.putIfAbsent(currentQueryIdentifierProto, credentialsBinary.duplicate());
+ // } else {
+ // credentialsBinary = credentialsBinary.duplicate();
+ // }
+ // builder.setCredentialsBinary(ByteString.copyFrom(credentialsBinary));
+ Credentials credentials = new Credentials();
+ TokenCache.setSessionToken(token, credentials);
+ ByteBuffer credentialsBinary = serializeCredentials(credentials);
- builder.setCredentialsBinary(ByteString.copyFrom(credentialsBinary));
-
-
- builder.setFragmentSpec(Converters.convertTaskSpecToProto(taskSpec));
+
+ FragmentRuntimeInfo.Builder runtimeInfo = FragmentRuntimeInfo.newBuilder();
+ runtimeInfo.setCurrentAttemptStartTime(System.currentTimeMillis());
+ runtimeInfo.setWithinDagPriority(0);
+ runtimeInfo.setDagStartTime(submitWorkInfo.getCreationTime());
+ runtimeInfo.setFirstAttemptStartTime(submitWorkInfo.getCreationTime());
+ runtimeInfo.setNumSelfAndUpstreamTasks(taskSpec.getVertexParallelism());
+ runtimeInfo.setNumSelfAndUpstreamCompletedTasks(0);
+
++ SubmitWorkRequestProto.Builder builder = SubmitWorkRequestProto.newBuilder();
+
++ builder.setWorkSpec(VertexOrBinary.newBuilder().setVertex(svs).build());
++ // TODO work spec signature
++ builder.setFragmentNumber(taskSpec.getTaskAttemptID().getTaskID().getId());
++ builder.setAttemptNumber(0);
++ builder.setContainerIdString(containerId.toString());
++ builder.setAmHost(address.getHostName());
++ builder.setAmPort(address.getPort());
++ builder.setCredentialsBinary(ByteString.copyFrom(credentialsBinary));
+ builder.setFragmentRuntimeInfo(runtimeInfo.build());
++
+ return builder.build();
+ }
+
+ private ByteBuffer serializeCredentials(Credentials credentials) throws IOException {
+ Credentials containerCredentials = new Credentials();
+ containerCredentials.addAll(credentials);
+ DataOutputBuffer containerTokens_dob = new DataOutputBuffer();
+ containerCredentials.writeTokenStorageToStream(containerTokens_dob);
+ return ByteBuffer.wrap(containerTokens_dob.getData(), 0, containerTokens_dob.getLength());
+ }
+
+ private static class LlapRecordReaderTaskUmbilicalExternalResponder implements LlapTaskUmbilicalExternalResponder {
+ protected LlapBaseRecordReader recordReader = null;
+ protected LinkedBlockingQueue<ReaderEvent> queuedEvents = new LinkedBlockingQueue<ReaderEvent>();
+
+ public LlapRecordReaderTaskUmbilicalExternalResponder() {
+ }
+
+ @Override
+ public void submissionFailed(String fragmentId, Throwable throwable) {
+ try {
+ sendOrQueueEvent(ReaderEvent.errorEvent(
+ "Received submission failed event for fragment ID " + fragmentId));
+ } catch (Exception err) {
+ LOG.error("Error during heartbeat responder:", err);
+ }
+ }
+
+ @Override
+ public void heartbeat(TezHeartbeatRequest request) {
+ TezTaskAttemptID taskAttemptId = request.getCurrentTaskAttemptID();
+ List<TezEvent> inEvents = request.getEvents();
+ for (TezEvent tezEvent : ListUtils.emptyIfNull(inEvents)) {
+ EventType eventType = tezEvent.getEventType();
+ try {
+ switch (eventType) {
+ case TASK_ATTEMPT_COMPLETED_EVENT:
+ sendOrQueueEvent(ReaderEvent.doneEvent());
+ break;
+ case TASK_ATTEMPT_FAILED_EVENT:
+ TaskAttemptFailedEvent taskFailedEvent = (TaskAttemptFailedEvent) tezEvent.getEvent();
+ sendOrQueueEvent(ReaderEvent.errorEvent(taskFailedEvent.getDiagnostics()));
+ break;
+ case TASK_STATUS_UPDATE_EVENT:
+ // If we want to handle counters
+ break;
+ default:
+ LOG.warn("Unhandled event type " + eventType);
+ break;
+ }
+ } catch (Exception err) {
+ LOG.error("Error during heartbeat responder:", err);
+ }
+ }
+ }
+
+ @Override
+ public void taskKilled(TezTaskAttemptID taskAttemptId) {
+ try {
+ sendOrQueueEvent(ReaderEvent.errorEvent(
+ "Received task killed event for task ID " + taskAttemptId));
+ } catch (Exception err) {
+ LOG.error("Error during heartbeat responder:", err);
+ }
+ }
+
+ @Override
+ public void heartbeatTimeout(String taskAttemptId) {
+ try {
+ sendOrQueueEvent(ReaderEvent.errorEvent(
+ "Timed out waiting for heartbeat for task ID " + taskAttemptId));
+ } catch (Exception err) {
+ LOG.error("Error during heartbeat responder:", err);
+ }
+ }
+
+ public synchronized LlapBaseRecordReader getRecordReader() {
+ return recordReader;
+ }
+
+ public synchronized void setRecordReader(LlapBaseRecordReader recordReader) {
+ this.recordReader = recordReader;
+
+ if (recordReader == null) {
+ return;
+ }
+
+ // If any events were queued by the responder, give them to the record reader now.
+ while (!queuedEvents.isEmpty()) {
+ ReaderEvent readerEvent = queuedEvents.poll();
+ LOG.debug("Sending queued event to record reader: " + readerEvent.getEventType());
+ recordReader.handleEvent(readerEvent);
+ }
+ }
+
+ /**
+ * Send the ReaderEvents to the record reader, if it is registered to this responder.
+ * If there is no registered record reader, add them to a list of pending reader events
+ * since we don't want to drop these events.
+ * @param readerEvent
+ */
+ protected synchronized void sendOrQueueEvent(ReaderEvent readerEvent) {
+ LlapBaseRecordReader recordReader = getRecordReader();
+ if (recordReader != null) {
+ recordReader.handleEvent(readerEvent);
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("No registered record reader, queueing event " + readerEvent.getEventType()
+ + " with message " + readerEvent.getMessage());
+ }
+
+ try {
+ queuedEvents.put(readerEvent);
+ } catch (Exception err) {
+ throw new RuntimeException("Unexpected exception while queueing reader event", err);
+ }
+ }
+ }
+
+ /**
+ * Clear the list of queued reader events if we are not interested in sending any pending events to any registering record reader.
+ */
+ public void clearQueuedEvents() {
+ queuedEvents.clear();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/e0579097/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
----------------------------------------------------------------------
diff --cc llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
index d8367ce,2bfe3ed..2524dc2
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
@@@ -263,13 -267,12 +267,12 @@@ public class ContainerRunnerImpl extend
new QueryIdentifier(request.getQueryIdentifier().getAppIdentifier(),
request.getQueryIdentifier().getDagIdentifier());
LOG.info("Processing queryComplete notification for {}", queryIdentifier);
- List<QueryFragmentInfo> knownFragments =
- queryTracker
- .queryComplete(queryIdentifier, request.getDeleteDelay());
- LOG.info("Pending fragment count for completed query {} = {}", queryIdentifier,
+ List<QueryFragmentInfo> knownFragments = queryTracker.queryComplete(
+ queryIdentifier, request.getDeleteDelay(), false);
+ LOG.info("DBG: Pending fragment count for completed query {} = {}", queryIdentifier,
knownFragments.size());
for (QueryFragmentInfo fragmentInfo : knownFragments) {
- LOG.info("DBG: Issuing killFragment for completed query {} {}", queryIdentifier,
+ LOG.info("Issuing killFragment for completed query {} {}", queryIdentifier,
fragmentInfo.getFragmentIdentifierString());
executorService.killFragment(fragmentInfo.getFragmentIdentifierString());
}
http://git-wip-us.apache.org/repos/asf/hive/blob/e0579097/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/e0579097/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
----------------------------------------------------------------------
diff --cc llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
index 4a33373,3093de7..8594ee1
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
@@@ -134,15 -135,20 +135,18 @@@ public class TaskRunnerCallable extend
this.memoryAvailable = memoryAvailable;
this.confParams = confParams;
this.jobToken = TokenCache.getSessionToken(credentials);
- this.taskSpec = Converters.getTaskSpecfromProto(request.getFragmentSpec());
+ // TODO: support binary spec here or above
+ this.vertex = request.getWorkSpec().getVertex();
+ this.taskSpec = Converters.getTaskSpecfromProto(
+ vertex, request.getFragmentNumber(), request.getAttemptNumber(), attemptId);
this.amReporter = amReporter;
// Register with the AMReporter when the callable is setup. Unregister once it starts running.
- if (jobToken != null) {
this.amReporter.registerTask(request.getAmHost(), request.getAmPort(),
- request.getUser(), jobToken, fragmentInfo.getQueryInfo().getQueryIdentifier());
+ vertex.getUser(), jobToken, fragmentInfo.getQueryInfo().getQueryIdentifier());
- }
this.metrics = metrics;
- this.requestId = request.getFragmentSpec().getFragmentIdentifierString();
+ this.requestId = taskSpec.getTaskAttemptID().toString();
// TODO Change this to the queryId/Name when that's available.
- this.queryId = request.getFragmentSpec().getDagName();
+ this.queryId = vertex.getDagName();
this.killedTaskHandler = killedTaskHandler;
this.fragmentCompletionHanler = fragmentCompleteHandler;
this.tezHadoopShim = tezHadoopShim;
http://git-wip-us.apache.org/repos/asf/hive/blob/e0579097/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
----------------------------------------------------------------------