You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sp...@apache.org on 2016/05/06 20:42:43 UTC

[12/50] [abbrv] hive git commit: Merge branch 'master' into llap

Merge branch 'master' into llap

Conflicts:
	llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
	llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
	llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/e0579097
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/e0579097
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/e0579097

Branch: refs/heads/java8
Commit: e057909732b40b581fcad3f61fb798600f01ecdf
Parents: 4847f65 8729966
Author: Jason Dere <jd...@hortonworks.com>
Authored: Wed May 4 00:17:12 2016 -0700
Committer: Jason Dere <jd...@hortonworks.com>
Committed: Wed May 4 00:17:12 2016 -0700

----------------------------------------------------------------------
 HIVE-13509.2.patch                              |  478 --
 .../ext/LlapTaskUmbilicalExternalClient.java    |   18 +-
 .../daemon/rpc/LlapDaemonProtocolProtos.java    | 7000 +++++++++++-------
 .../org/apache/hadoop/hive/llap/DaemonId.java   |   41 +
 .../hive/llap/security/LlapTokenIdentifier.java |   39 +-
 .../hive/llap/security/LlapTokenProvider.java   |    2 +-
 .../apache/hadoop/hive/llap/tez/Converters.java |   84 +-
 .../src/protobuf/LlapDaemonProtocol.proto       |   70 +-
 .../hadoop/hive/llap/tez/TestConverters.java    |   51 +-
 .../hadoop/hive/llap/LlapBaseInputFormat.java   |   32 +-
 .../hive/llap/daemon/ContainerRunner.java       |    9 +-
 .../llap/daemon/impl/ContainerRunnerImpl.java   |  135 +-
 .../hive/llap/daemon/impl/LlapDaemon.java       |   52 +-
 .../daemon/impl/LlapProtocolServerImpl.java     |   41 +-
 .../hive/llap/daemon/impl/LlapTokenChecker.java |  137 +
 .../llap/daemon/impl/QueryFragmentInfo.java     |   23 +-
 .../hadoop/hive/llap/daemon/impl/QueryInfo.java |   26 +-
 .../hive/llap/daemon/impl/QueryTracker.java     |   97 +-
 .../hadoop/hive/llap/daemon/impl/Scheduler.java |    2 +
 .../llap/daemon/impl/TaskExecutorService.java   |   17 +-
 .../llap/daemon/impl/TaskRunnerCallable.java    |   77 +-
 .../hive/llap/security/LlapSecurityHelper.java  |   15 +-
 .../hive/llap/security/SecretManager.java       |   19 +-
 .../hive/llap/daemon/MiniLlapCluster.java       |    2 +-
 .../daemon/impl/TaskExecutorTestHelpers.java    |   44 +-
 .../impl/TestLlapDaemonProtocolServerImpl.java  |    2 +-
 .../llap/daemon/impl/TestLlapTokenChecker.java  |   96 +
 .../TestFirstInFirstOutComparator.java          |   27 +-
 .../llap/tezplugins/LlapTaskCommunicator.java   |   31 +-
 .../hadoop/hive/metastore/txn/TxnHandler.java   |    5 +
 .../hive/ql/exec/tez/TezSessionState.java       |    3 +-
 .../hadoop/hive/ql/lockmgr/DbTxnManager.java    |   12 +
 .../hadoop/hive/ql/parse/SemanticAnalyzer.java  |    1 -
 .../ql/parse/UpdateDeleteSemanticAnalyzer.java  |   16 +-
 .../apache/hadoop/hive/ql/TestTxnCommands2.java |   22 +
 .../hive/ql/lockmgr/TestDbTxnManager2.java      |  114 +
 .../dynpart_sort_optimization_acid.q.out        |  120 +-
 37 files changed, 5479 insertions(+), 3481 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/e0579097/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
----------------------------------------------------------------------
diff --cc llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
index fe2fd7c,0000000..6e2c85d
mode 100644,000000..100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
@@@ -1,413 -1,0 +1,421 @@@
 +package org.apache.hadoop.hive.llap.ext;
 +
 +import java.io.IOException;
 +import java.net.InetSocketAddress;
 +import java.util.ArrayList;
 +import java.util.Collections;
 +import java.util.List;
 +import java.util.concurrent.atomic.AtomicLong;
 +import java.util.concurrent.ConcurrentHashMap;
 +import java.util.concurrent.ConcurrentMap;
 +import java.util.concurrent.ScheduledThreadPoolExecutor;
 +import java.util.concurrent.TimeUnit;
 +
 +import com.google.common.base.Preconditions;
 +import org.apache.commons.collections4.ListUtils;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.hive.conf.HiveConf;
 +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
++import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec;
 +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
++import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.VertexIdentifier;
 +import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol;
++import org.apache.hadoop.hive.llap.tez.Converters;
 +import org.apache.hadoop.hive.llap.tez.LlapProtocolClientProxy;
 +import org.apache.hadoop.hive.llap.tezplugins.helpers.LlapTaskUmbilicalServer;
 +import org.apache.hadoop.io.Text;
 +import org.apache.hadoop.ipc.ProtocolSignature;
 +import org.apache.hadoop.security.token.Token;
 +import org.apache.hadoop.service.AbstractService;
 +import org.apache.hadoop.yarn.api.records.ContainerId;
 +import org.apache.hadoop.yarn.util.ConverterUtils;
 +import org.apache.tez.common.security.JobTokenIdentifier;
 +import org.apache.tez.dag.api.TezException;
 +import org.apache.tez.dag.records.TezTaskAttemptID;
 +import org.apache.tez.runtime.api.Event;
 +import org.apache.tez.runtime.api.impl.EventType;
 +import org.apache.tez.runtime.api.impl.TezEvent;
 +import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
 +import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +
 +public class LlapTaskUmbilicalExternalClient extends AbstractService {
 +
 +  private static final Logger LOG = LoggerFactory.getLogger(LlapTaskUmbilicalExternalClient.class);
 +
 +  private final LlapProtocolClientProxy communicator;
 +  private volatile LlapTaskUmbilicalServer llapTaskUmbilicalServer;
 +  private final Configuration conf;
 +  private final LlapTaskUmbilicalProtocol umbilical;
 +
 +  protected final String tokenIdentifier;
 +  protected final Token<JobTokenIdentifier> sessionToken;
 +
 +  private final ConcurrentMap<String, PendingEventData> pendingEvents = new ConcurrentHashMap<>();
 +  private final ConcurrentMap<String, TaskHeartbeatInfo> registeredTasks= new ConcurrentHashMap<String, TaskHeartbeatInfo>();
 +  private LlapTaskUmbilicalExternalResponder responder = null;
 +  private final ScheduledThreadPoolExecutor timer;
 +  private final long connectionTimeout;
 +
 +  private static class TaskHeartbeatInfo {
 +    final String taskAttemptId;
 +    final String hostname;
 +    final int port;
 +    final AtomicLong lastHeartbeat = new AtomicLong();
 +
 +    public TaskHeartbeatInfo(String taskAttemptId, String hostname, int port) {
 +      this.taskAttemptId = taskAttemptId;
 +      this.hostname = hostname;
 +      this.port = port;
 +      this.lastHeartbeat.set(System.currentTimeMillis());
 +    }
 +  }
 +
 +  private static class PendingEventData {
 +    final TaskHeartbeatInfo heartbeatInfo;
 +    final List<TezEvent> tezEvents;
 +
 +    public PendingEventData(TaskHeartbeatInfo heartbeatInfo, List<TezEvent> tezEvents) {
 +      this.heartbeatInfo = heartbeatInfo;
 +      this.tezEvents = tezEvents;
 +    }
 +  }
 +
 +  // TODO KKK Work out the details of the tokenIdentifier, and the session token.
 +  // It may just be possible to create one here - since Shuffle is not involved, and this is only used
 +  // for communication from LLAP-Daemons to the server. It will need to be sent in as part
 +  // of the job submission request.
 +  public LlapTaskUmbilicalExternalClient(Configuration conf, String tokenIdentifier,
 +      Token<JobTokenIdentifier> sessionToken, LlapTaskUmbilicalExternalResponder responder) {
 +    super(LlapTaskUmbilicalExternalClient.class.getName());
 +    this.conf = conf;
 +    this.umbilical = new LlapTaskUmbilicalExternalImpl();
 +    this.tokenIdentifier = tokenIdentifier;
 +    this.sessionToken = sessionToken;
 +    this.responder = responder;
 +    this.timer = new ScheduledThreadPoolExecutor(1);
 +    this.connectionTimeout = HiveConf.getTimeVar(conf,
 +        HiveConf.ConfVars.LLAP_DAEMON_AM_LIVENESS_CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS);
 +    // TODO. No support for the LLAP token yet. Add support for configurable threads, however 1 should always be enough.
 +    this.communicator = new LlapProtocolClientProxy(1, conf, null);
 +    this.communicator.init(conf);
 +  }
 +
 +  @Override
 +  public void serviceStart() throws IOException {
 +    // If we use a single server for multiple external clients, then consider using more than one handler.
 +    int numHandlers = 1;
 +    llapTaskUmbilicalServer = new LlapTaskUmbilicalServer(conf, umbilical, numHandlers, tokenIdentifier, sessionToken);
 +    communicator.start();
 +  }
 +
 +  @Override
 +  public void serviceStop() {
 +    llapTaskUmbilicalServer.shutdownServer();
 +    timer.shutdown();
 +    if (this.communicator != null) {
 +      this.communicator.stop();
 +    }
 +  }
 +
 +  public InetSocketAddress getAddress() {
 +    return llapTaskUmbilicalServer.getAddress();
 +  }
 +
 +
 +  /**
 +   * Submit the work for actual execution.
 +   * @param submitWorkRequestProto
 +   */
 +  public void submitWork(final SubmitWorkRequestProto submitWorkRequestProto, String llapHost, int llapPort, List<TezEvent> tezEvents) {
 +    // Register the pending events to be sent for this spec.
-     String fragmentId = submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString();
++    SignableVertexSpec vertex = submitWorkRequestProto.getWorkSpec().getVertex();
++    VertexIdentifier vId = vertex.getVertexIdentifier();
++    TezTaskAttemptID attemptId = Converters.createTaskAttemptId(
++        vId, submitWorkRequestProto.getFragmentNumber(), submitWorkRequestProto.getAttemptNumber());
++    final String fragmentId = attemptId.toString();
++
 +    PendingEventData pendingEventData = new PendingEventData(
 +        new TaskHeartbeatInfo(fragmentId, llapHost, llapPort),
 +        tezEvents);
 +    pendingEvents.putIfAbsent(fragmentId, pendingEventData);
 +
 +    // Setup timer task to check for hearbeat timeouts
 +    timer.scheduleAtFixedRate(new HeartbeatCheckTask(),
 +        connectionTimeout, connectionTimeout, TimeUnit.MILLISECONDS);
 +
 +    // Send out the actual SubmitWorkRequest
 +    communicator.sendSubmitWork(submitWorkRequestProto, llapHost, llapPort,
 +        new LlapProtocolClientProxy.ExecuteRequestCallback<LlapDaemonProtocolProtos.SubmitWorkResponseProto>() {
 +
 +          @Override
 +          public void setResponse(LlapDaemonProtocolProtos.SubmitWorkResponseProto response) {
 +            if (response.hasSubmissionState()) {
 +              if (response.getSubmissionState().equals(LlapDaemonProtocolProtos.SubmissionStateProto.REJECTED)) {
-                 String msg = "Fragment: " + submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString() + " rejected. Server Busy.";
++                String msg = "Fragment: " + fragmentId + " rejected. Server Busy.";
 +                LOG.info(msg);
 +                if (responder != null) {
 +                  Throwable err = new RuntimeException(msg);
-                   responder.submissionFailed(submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString(), err);
++                  responder.submissionFailed(fragmentId, err);
 +                }
 +                return;
 +              }
 +            }
 +          }
 +
 +          @Override
 +          public void indicateError(Throwable t) {
-             String msg = "Failed to submit: " + submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString();
++            String msg = "Failed to submit: " + fragmentId;
 +            LOG.error(msg, t);
 +            Throwable err = new RuntimeException(msg, t);
-             responder.submissionFailed(submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString(), err);
++            responder.submissionFailed(fragmentId, err);
 +          }
 +        });
 +
 +
 +
 +
 +
 +//    // TODO Also send out information saying that the fragment is finishable - if that is not already included in the main fragment.
 +//    // This entire call is only required if we're doing more than scans. MRInput has no dependencies and is always finishable
 +//    QueryIdentifierProto queryIdentifier = QueryIdentifierProto
 +//        .newBuilder()
 +//        .setAppIdentifier(submitWorkRequestProto.getApplicationIdString()).setDagIdentifier(submitWorkRequestProto.getFragmentSpec().getDagId())
 +//        .build();
 +//    LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto sourceStateUpdatedRequest =
 +//        LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto.newBuilder().setQueryIdentifier(queryIdentifier).setState(
 +//            LlapDaemonProtocolProtos.SourceStateProto.S_SUCCEEDED).
 +//            setSrcName(TODO)
 +//    communicator.sendSourceStateUpdate(LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto.newBuilder().setQueryIdentifier(submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString()).set);
 +
 +
 +  }
 +
 +  private void updateHeartbeatInfo(String taskAttemptId) {
 +    int updateCount = 0;
 +
 +    PendingEventData pendingEventData = pendingEvents.get(taskAttemptId);
 +    if (pendingEventData != null) {
 +      pendingEventData.heartbeatInfo.lastHeartbeat.set(System.currentTimeMillis());
 +      updateCount++;
 +    }
 +
 +    TaskHeartbeatInfo heartbeatInfo = registeredTasks.get(taskAttemptId);
 +    if (heartbeatInfo != null) {
 +      heartbeatInfo.lastHeartbeat.set(System.currentTimeMillis());
 +      updateCount++;
 +    }
 +
 +    if (updateCount == 0) {
 +      LOG.warn("No tasks found for heartbeat from taskAttemptId " + taskAttemptId);
 +    }
 +  }
 +
 +  private void updateHeartbeatInfo(String hostname, int port) {
 +    int updateCount = 0;
 +
 +    for (String key : pendingEvents.keySet()) {
 +      PendingEventData pendingEventData = pendingEvents.get(key);
 +      if (pendingEventData != null) {
 +        if (pendingEventData.heartbeatInfo.hostname.equals(hostname)
 +            && pendingEventData.heartbeatInfo.port == port) {
 +          pendingEventData.heartbeatInfo.lastHeartbeat.set(System.currentTimeMillis());
 +          updateCount++;
 +        }
 +      }
 +    }
 +
 +    for (String key : registeredTasks.keySet()) {
 +      TaskHeartbeatInfo heartbeatInfo = registeredTasks.get(key);
 +      if (heartbeatInfo != null) {
 +        if (heartbeatInfo.hostname.equals(hostname)
 +            && heartbeatInfo.port == port) {
 +          heartbeatInfo.lastHeartbeat.set(System.currentTimeMillis());
 +          updateCount++;
 +        }
 +      }
 +    }
 +
 +    if (updateCount == 0) {
 +      LOG.info("No tasks found for heartbeat from hostname " + hostname + ", port " + port);
 +    }
 +  }
 +
 +  private class HeartbeatCheckTask implements Runnable {
 +    public void run() {
 +      long currentTime = System.currentTimeMillis();
 +      List<String> timedOutTasks = new ArrayList<String>();
 +
 +      // Check both pending and registered tasks for timeouts
 +      for (String key : pendingEvents.keySet()) {
 +        PendingEventData pendingEventData = pendingEvents.get(key);
 +        if (pendingEventData != null) {
 +          if (currentTime - pendingEventData.heartbeatInfo.lastHeartbeat.get() >= connectionTimeout) {
 +            timedOutTasks.add(key);
 +          }
 +        }
 +      }
 +      for (String timedOutTask : timedOutTasks) {
 +        LOG.info("Pending taskAttemptId " + timedOutTask + " timed out");
 +        responder.heartbeatTimeout(timedOutTask);
 +        pendingEvents.remove(timedOutTask);
 +        // TODO: Do we need to tell the LLAP daemon we are no longer interested in this task?
 +      }
 +
 +      timedOutTasks.clear();
 +      for (String key : registeredTasks.keySet()) {
 +        TaskHeartbeatInfo heartbeatInfo = registeredTasks.get(key);
 +        if (heartbeatInfo != null) {
 +          if (currentTime - heartbeatInfo.lastHeartbeat.get() >= connectionTimeout) {
 +            timedOutTasks.add(key);
 +          }
 +        }
 +      }
 +      for (String timedOutTask : timedOutTasks) {
 +        LOG.info("Running taskAttemptId " + timedOutTask + " timed out");
 +        responder.heartbeatTimeout(timedOutTask);
 +        registeredTasks.remove(timedOutTask);
 +        // TODO: Do we need to tell the LLAP daemon we are no longer interested in this task?
 +      }
 +    }
 +  }
 +
 +  public interface LlapTaskUmbilicalExternalResponder {
 +    void submissionFailed(String fragmentId, Throwable throwable);
 +    void heartbeat(TezHeartbeatRequest request);
 +    void taskKilled(TezTaskAttemptID taskAttemptId);
 +    void heartbeatTimeout(String fragmentId);
 +  }
 +
 +
 +
 +  // TODO Ideally, the server should be shared across all client sessions running on the same node.
 +  private class LlapTaskUmbilicalExternalImpl implements  LlapTaskUmbilicalProtocol {
 +
 +    @Override
 +    public boolean canCommit(TezTaskAttemptID taskid) throws IOException {
 +      // Expecting only a single instance of a task to be running.
 +      return true;
 +    }
 +
 +    @Override
 +    public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request) throws IOException,
 +        TezException {
 +      // Keep-alive information. The client should be informed and will have to take care of re-submitting the work.
 +      // Some parts of fault tolerance go here.
 +
 +      // This also provides completion information, and a possible notification when task actually starts running (first heartbeat)
 +
 +      if (LOG.isDebugEnabled()) {
 +        LOG.debug("Received heartbeat from container, request=" + request);
 +      }
 +
 +      // Incoming events can be ignored until the point when shuffle needs to be handled, instead of just scans.
 +      TezHeartbeatResponse response = new TezHeartbeatResponse();
 +
 +      response.setLastRequestId(request.getRequestId());
 +      // Assuming TaskAttemptId and FragmentIdentifierString are the same. Verify this.
 +      TezTaskAttemptID taskAttemptId = request.getCurrentTaskAttemptID();
 +      String taskAttemptIdString = taskAttemptId.toString();
 +
 +      updateHeartbeatInfo(taskAttemptIdString);
 +
 +      List<TezEvent> tezEvents = null;
 +      PendingEventData pendingEventData = pendingEvents.remove(taskAttemptIdString);
 +      if (pendingEventData == null) {
 +        tezEvents = Collections.emptyList();
 +
 +        // If this heartbeat was not from a pending event and it's not in our list of registered tasks,
 +        if (!registeredTasks.containsKey(taskAttemptIdString)) {
 +          LOG.info("Unexpected heartbeat from " + taskAttemptIdString);
 +          response.setShouldDie(); // Do any of the other fields need to be set?
 +          return response;
 +        }
 +      } else {
 +        tezEvents = pendingEventData.tezEvents;
 +        // Tasks removed from the pending list should then be added to the registered list.
 +        registeredTasks.put(taskAttemptIdString, pendingEventData.heartbeatInfo);
 +      }
 +
 +      response.setLastRequestId(request.getRequestId());
 +      // Irrelevant from eventIds. This can be tracked in the AM itself, instead of polluting the task.
 +      // Also since we have all the MRInput events here - they'll all be sent in together.
 +      response.setNextFromEventId(0); // Irrelevant. See comment above.
 +      response.setNextPreRoutedEventId(0); //Irrelevant. See comment above.
 +      response.setEvents(tezEvents);
 +
 +      List<TezEvent> inEvents = request.getEvents();
 +      if (LOG.isDebugEnabled()) {
 +        LOG.debug("Heartbeat from " + taskAttemptIdString +
 +            " events: " + (inEvents != null ? inEvents.size() : -1));
 +      }
 +      for (TezEvent tezEvent : ListUtils.emptyIfNull(inEvents)) {
 +        EventType eventType = tezEvent.getEventType();
 +        switch (eventType) {
 +          case TASK_ATTEMPT_COMPLETED_EVENT:
 +            LOG.debug("Task completed event for " + taskAttemptIdString);
 +            registeredTasks.remove(taskAttemptIdString);
 +            break;
 +          case TASK_ATTEMPT_FAILED_EVENT:
 +            LOG.debug("Task failed event for " + taskAttemptIdString);
 +            registeredTasks.remove(taskAttemptIdString);
 +            break;
 +          case TASK_STATUS_UPDATE_EVENT:
 +            // If we want to handle counters
 +            LOG.debug("Task update event for " + taskAttemptIdString);
 +            break;
 +          default:
 +            LOG.warn("Unhandled event type " + eventType);
 +            break;
 +        }
 +      }
 +
 +      // Pass the request on to the responder
 +      try {
 +        if (responder != null) {
 +          responder.heartbeat(request);
 +        }
 +      } catch (Exception err) {
 +        LOG.error("Error during responder execution", err);
 +      }
 +
 +      return response;
 +    }
 +
 +    @Override
 +    public void nodeHeartbeat(Text hostname, int port) throws IOException {
 +      updateHeartbeatInfo(hostname.toString(), port);
 +      // No need to propagate to this to the responder
 +    }
 +
 +    @Override
 +    public void taskKilled(TezTaskAttemptID taskAttemptId) throws IOException {
 +      String taskAttemptIdString = taskAttemptId.toString();
 +      LOG.error("Task killed - " + taskAttemptIdString);
 +      registeredTasks.remove(taskAttemptIdString);
 +
 +      try {
 +        if (responder != null) {
 +          responder.taskKilled(taskAttemptId);
 +        }
 +      } catch (Exception err) {
 +        LOG.error("Error during responder execution", err);
 +      }
 +    }
 +
 +    @Override
 +    public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
 +      return 0;
 +    }
 +
 +    @Override
 +    public ProtocolSignature getProtocolSignature(String protocol, long clientVersion,
 +                                                  int clientMethodsHash) throws IOException {
 +      return ProtocolSignature.getProtocolSignature(this, protocol,
 +          clientVersion, clientMethodsHash);
 +    }
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/hive/blob/e0579097/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
----------------------------------------------------------------------
diff --cc llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
index 8db2f88,0000000..988002f
mode 100644,000000..100644
--- a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
+++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
@@@ -1,476 -1,0 +1,480 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.hadoop.hive.llap;
 +
 +import java.util.ArrayList;
 +import java.util.List;
 +import java.util.Set;
 +import java.util.concurrent.LinkedBlockingQueue;
 +
 +import java.sql.SQLException;
 +import java.sql.Connection;
 +import java.sql.ResultSet;
 +import java.sql.Statement;
 +import java.sql.DriverManager;
 +
 +import java.io.IOException;
 +import java.io.DataInput;
 +import java.io.DataOutput;
 +import java.io.DataInputStream;
 +import java.io.ByteArrayInputStream;
 +import java.net.InetAddress;
 +import java.net.InetSocketAddress;
 +import java.net.Socket;
 +import java.nio.ByteBuffer;
 +
 +import org.apache.commons.collections4.ListUtils;
 +
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.hive.conf.HiveConf;
 +import org.apache.hadoop.hive.llap.LlapBaseRecordReader;
 +import org.apache.hadoop.hive.llap.LlapBaseRecordReader.ReaderEvent;
 +import org.apache.hadoop.hive.llap.LlapInputSplit;
 +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo;
++import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec;
 +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
++import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.VertexOrBinary;
 +import org.apache.hadoop.hive.llap.ext.LlapTaskUmbilicalExternalClient;
 +import org.apache.hadoop.hive.llap.ext.LlapTaskUmbilicalExternalClient.LlapTaskUmbilicalExternalResponder;
 +import org.apache.hadoop.hive.llap.registry.ServiceInstance;
 +import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet;
 +import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
 +import org.apache.hadoop.hive.llap.tez.Converters;
 +import org.apache.hadoop.io.DataInputBuffer;
 +import org.apache.hadoop.io.DataOutputBuffer;
 +import org.apache.hadoop.io.Text;
 +import org.apache.hadoop.io.WritableComparable;
 +import org.apache.hadoop.io.NullWritable;
 +import org.apache.hadoop.io.Writable;
 +import org.apache.hadoop.mapred.JobConf;
 +import org.apache.hadoop.mapred.InputFormat;
 +import org.apache.hadoop.mapred.InputSplit;
 +import org.apache.hadoop.mapred.InputSplitWithLocationInfo;
 +import org.apache.hadoop.mapred.SplitLocationInfo;
 +import org.apache.hadoop.mapred.FileSplit;
 +import org.apache.hadoop.mapred.RecordReader;
 +import org.apache.hadoop.mapred.Reporter;
 +import org.apache.hadoop.security.Credentials;
 +import org.apache.hadoop.security.UserGroupInformation;
 +import org.apache.hadoop.security.token.Token;
 +import org.apache.hadoop.security.token.TokenIdentifier;
 +import org.apache.hadoop.util.Progressable;
 +import org.apache.hadoop.yarn.api.ApplicationConstants;
 +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 +import org.apache.hadoop.yarn.api.records.ApplicationId;
 +import org.apache.hadoop.yarn.api.records.ContainerId;
 +
 +import org.apache.tez.common.security.JobTokenIdentifier;
 +import org.apache.tez.common.security.TokenCache;
 +import org.apache.tez.dag.records.TezTaskAttemptID;
 +import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
 +import org.apache.tez.runtime.api.impl.EventType;
 +import org.apache.tez.runtime.api.impl.TaskSpec;
 +import org.apache.tez.runtime.api.impl.TezEvent;
 +import org.apache.tez.runtime.api.impl.TezEvent;
 +import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
 +import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
 +
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import com.google.common.base.Preconditions;
 +import com.google.common.collect.Lists;
 +import com.google.protobuf.ByteString;
 +
 +
 +public class LlapBaseInputFormat<V extends WritableComparable> implements InputFormat<NullWritable, V> {
 +
 +  private static final Logger LOG = LoggerFactory.getLogger(LlapBaseInputFormat.class);
 +
 +  private static String driverName = "org.apache.hive.jdbc.HiveDriver";
 +  private String url;  // "jdbc:hive2://localhost:10000/default"
 +  private String user; // "hive",
 +  private String pwd;  // ""
 +  private String query;
 +
 +  public static final String URL_KEY = "llap.if.hs2.connection";
 +  public static final String QUERY_KEY = "llap.if.query";
 +  public static final String USER_KEY = "llap.if.user";
 +  public static final String PWD_KEY = "llap.if.pwd";
 +
 +  public final String SPLIT_QUERY = "select get_splits(\"%s\",%d)";
 +
 +  private Connection con;
 +  private Statement stmt;
 +
 +  public LlapBaseInputFormat(String url, String user, String pwd, String query) {
 +    this.url = url;
 +    this.user = user;
 +    this.pwd = pwd;
 +    this.query = query;
 +  }
 +
 +  public LlapBaseInputFormat() {}
 +
 +
 +  @Override
 +  public RecordReader<NullWritable, V> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
 +
 +    LlapInputSplit llapSplit = (LlapInputSplit) split;
 +
 +    // Set conf to use LLAP user rather than current user for LLAP Zk registry.
 +    HiveConf.setVar(job, HiveConf.ConfVars.LLAP_ZK_REGISTRY_USER, llapSplit.getLlapUser());
 +    SubmitWorkInfo submitWorkInfo = SubmitWorkInfo.fromBytes(llapSplit.getPlanBytes());
 +
 +    ServiceInstance serviceInstance = getServiceInstance(job, llapSplit);
 +    String host = serviceInstance.getHost();
 +    int llapSubmitPort = serviceInstance.getRpcPort();
 +
 +    LOG.info("Found service instance for host " + host + " with rpc port " + llapSubmitPort
 +        + " and outputformat port " + serviceInstance.getOutputFormatPort());
 +
 +    LlapRecordReaderTaskUmbilicalExternalResponder umbilicalResponder =
 +        new LlapRecordReaderTaskUmbilicalExternalResponder();
 +    LlapTaskUmbilicalExternalClient llapClient =
 +      new LlapTaskUmbilicalExternalClient(job, submitWorkInfo.getTokenIdentifier(),
 +          submitWorkInfo.getToken(), umbilicalResponder);
 +    llapClient.init(job);
 +    llapClient.start();
 +
 +    SubmitWorkRequestProto submitWorkRequestProto =
 +      constructSubmitWorkRequestProto(submitWorkInfo, llapSplit.getSplitNum(),
 +          llapClient.getAddress(), submitWorkInfo.getToken());
 +
 +    TezEvent tezEvent = new TezEvent();
 +    DataInputBuffer dib = new DataInputBuffer();
 +    dib.reset(llapSplit.getFragmentBytes(), 0, llapSplit.getFragmentBytes().length);
 +    tezEvent.readFields(dib);
 +    List<TezEvent> tezEventList = Lists.newArrayList();
 +    tezEventList.add(tezEvent);
 +
 +    llapClient.submitWork(submitWorkRequestProto, host, llapSubmitPort, tezEventList);
 +
 +    String id = HiveConf.getVar(job, HiveConf.ConfVars.HIVEQUERYID) + "_" + llapSplit.getSplitNum();
 +
 +    HiveConf conf = new HiveConf();
 +    Socket socket = new Socket(host,
 +        serviceInstance.getOutputFormatPort());
 +
 +    LOG.debug("Socket connected");
 +
 +    socket.getOutputStream().write(id.getBytes());
 +    socket.getOutputStream().write(0);
 +    socket.getOutputStream().flush();
 +
 +    LOG.info("Registered id: " + id);
 +
 +    LlapBaseRecordReader recordReader = new LlapBaseRecordReader(socket.getInputStream(), llapSplit.getSchema(), Text.class);
 +    umbilicalResponder.setRecordReader(recordReader);
 +    return recordReader;
 +  }
 +
 +  @Override
 +  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
 +    List<InputSplit> ins = new ArrayList<InputSplit>();
 +
 +    if (url == null) url = job.get(URL_KEY);
 +    if (query == null) query = job.get(QUERY_KEY);
 +    if (user == null) user = job.get(USER_KEY);
 +    if (pwd == null) pwd = job.get(PWD_KEY);
 +
 +    if (url == null || query == null) {
 +      throw new IllegalStateException();
 +    }
 +
 +    try {
 +      Class.forName(driverName);
 +    } catch (ClassNotFoundException e) {
 +      throw new IOException(e);
 +    }
 +
 +    try {
 +      con = DriverManager.getConnection(url,user,pwd);
 +      stmt = con.createStatement();
 +      String sql = String.format(SPLIT_QUERY, query, numSplits);
 +      ResultSet res = stmt.executeQuery(sql);
 +      while (res.next()) {
 +        // deserialize split
 +        DataInput in = new DataInputStream(res.getBinaryStream(1));
 +        InputSplitWithLocationInfo is = new LlapInputSplit();
 +        is.readFields(in);
 +        ins.add(is);
 +      }
 +
 +      res.close();
 +      stmt.close();
 +    } catch (Exception e) {
 +      throw new IOException(e);
 +    }
 +    return ins.toArray(new InputSplit[ins.size()]);
 +  }
 +
 +  public void close() {
 +    try {
 +      con.close();
 +    } catch (Exception e) {
 +      // ignore
 +    }
 +  }
 +
 +  private ServiceInstance getServiceInstance(JobConf job, LlapInputSplit llapSplit) throws IOException {
 +    LlapRegistryService registryService = LlapRegistryService.getClient(job);
 +    String host = llapSplit.getLocations()[0];
 +
 +    ServiceInstance serviceInstance = getServiceInstanceForHost(registryService, host);
 +    if (serviceInstance == null) {
 +      throw new IOException("No service instances found for " + host + " in registry");
 +    }
 +
 +    return serviceInstance;
 +  }
 +
 +  private ServiceInstance getServiceInstanceForHost(LlapRegistryService registryService, String host) throws IOException {
 +    InetAddress address = InetAddress.getByName(host);
 +    ServiceInstanceSet instanceSet = registryService.getInstances();
 +    ServiceInstance serviceInstance = null;
 +
 +    // The name used in the service registry may not match the host name we're using.
 +    // Try hostname/canonical hostname/host address
 +
 +    String name = address.getHostName();
 +    LOG.info("Searching service instance by hostname " + name);
 +    serviceInstance = selectServiceInstance(instanceSet.getByHost(name));
 +    if (serviceInstance != null) {
 +      return serviceInstance;
 +    }
 +
 +    name = address.getCanonicalHostName();
 +    LOG.info("Searching service instance by canonical hostname " + name);
 +    serviceInstance = selectServiceInstance(instanceSet.getByHost(name));
 +    if (serviceInstance != null) {
 +      return serviceInstance;
 +    }
 +
 +    name = address.getHostAddress();
 +    LOG.info("Searching service instance by address " + name);
 +    serviceInstance = selectServiceInstance(instanceSet.getByHost(name));
 +    if (serviceInstance != null) {
 +      return serviceInstance;
 +    }
 +
 +    return serviceInstance;
 +  }
 +
 +  private ServiceInstance selectServiceInstance(Set<ServiceInstance> serviceInstances) {
 +    if (serviceInstances == null || serviceInstances.isEmpty()) {
 +      return null;
 +    }
 +
 +    // Get the first live service instance
 +    for (ServiceInstance serviceInstance : serviceInstances) {
 +      if (serviceInstance.isAlive()) {
 +        return serviceInstance;
 +      }
 +    }
 +
 +    LOG.info("No live service instances were found");
 +    return null;
 +  }
 +
 +  private SubmitWorkRequestProto constructSubmitWorkRequestProto(SubmitWorkInfo submitWorkInfo,
 +      int taskNum,
 +      InetSocketAddress address,
 +      Token<JobTokenIdentifier> token) throws
 +        IOException {
 +    TaskSpec taskSpec = submitWorkInfo.getTaskSpec();
 +    ApplicationId appId = submitWorkInfo.getFakeAppId();
 +
-     SubmitWorkRequestProto.Builder builder = SubmitWorkRequestProto.newBuilder();
++    int attemptId = taskSpec.getTaskAttemptID().getId();
 +    // This works, assuming the executor is running within YARN.
-     LOG.info("Setting user in submitWorkRequest to: " +
-         System.getenv(ApplicationConstants.Environment.USER.name()));
-     builder.setUser(System.getenv(ApplicationConstants.Environment.USER.name()));
-     builder.setApplicationIdString(appId.toString());
-     builder.setAppAttemptNumber(0);
-     builder.setTokenIdentifier(appId.toString());
++    String user = System.getenv(ApplicationConstants.Environment.USER.name());
++    LOG.info("Setting user in submitWorkRequest to: " + user);
++    SignableVertexSpec svs = Converters.convertTaskSpecToProto(
++        taskSpec, attemptId, appId.toString(), null, user); // TODO signatureKeyId
 +
 +    ContainerId containerId =
 +      ContainerId.newInstance(ApplicationAttemptId.newInstance(appId, 0), taskNum);
-     builder.setContainerIdString(containerId.toString());
 +
-     builder.setAmHost(address.getHostName());
-     builder.setAmPort(address.getPort());
++
 +    Credentials taskCredentials = new Credentials();
 +    // Credentials can change across DAGs. Ideally construct only once per DAG.
 +    // TODO Figure out where credentials will come from. Normally Hive sets up
 +    // URLs on the tez dag, for which Tez acquires credentials.
 +
 +    //    taskCredentials.addAll(getContext().getCredentials());
 +
 +    //    Preconditions.checkState(currentQueryIdentifierProto.getDagIdentifier() ==
 +    //        taskSpec.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getId());
 +    //    ByteBuffer credentialsBinary = credentialMap.get(currentQueryIdentifierProto);
 +    //    if (credentialsBinary == null) {
 +    //      credentialsBinary = serializeCredentials(getContext().getCredentials());
 +    //      credentialMap.putIfAbsent(currentQueryIdentifierProto, credentialsBinary.duplicate());
 +    //    } else {
 +    //      credentialsBinary = credentialsBinary.duplicate();
 +    //    }
 +    //    builder.setCredentialsBinary(ByteString.copyFrom(credentialsBinary));
 +    Credentials credentials = new Credentials();
 +    TokenCache.setSessionToken(token, credentials);
 +    ByteBuffer credentialsBinary = serializeCredentials(credentials);
-     builder.setCredentialsBinary(ByteString.copyFrom(credentialsBinary));
- 
- 
-     builder.setFragmentSpec(Converters.convertTaskSpecToProto(taskSpec));
 +
 +    FragmentRuntimeInfo.Builder runtimeInfo = FragmentRuntimeInfo.newBuilder();
 +    runtimeInfo.setCurrentAttemptStartTime(System.currentTimeMillis());
 +    runtimeInfo.setWithinDagPriority(0);
 +    runtimeInfo.setDagStartTime(submitWorkInfo.getCreationTime());
 +    runtimeInfo.setFirstAttemptStartTime(submitWorkInfo.getCreationTime());
 +    runtimeInfo.setNumSelfAndUpstreamTasks(taskSpec.getVertexParallelism());
 +    runtimeInfo.setNumSelfAndUpstreamCompletedTasks(0);
 +
++    SubmitWorkRequestProto.Builder builder = SubmitWorkRequestProto.newBuilder();
 +
++    builder.setWorkSpec(VertexOrBinary.newBuilder().setVertex(svs).build());
++    // TODO work spec signature
++    builder.setFragmentNumber(taskSpec.getTaskAttemptID().getTaskID().getId());
++    builder.setAttemptNumber(0);
++    builder.setContainerIdString(containerId.toString());
++    builder.setAmHost(address.getHostName());
++    builder.setAmPort(address.getPort());
++    builder.setCredentialsBinary(ByteString.copyFrom(credentialsBinary));
 +    builder.setFragmentRuntimeInfo(runtimeInfo.build());
++
 +    return builder.build();
 +  }
 +
 +  private ByteBuffer serializeCredentials(Credentials credentials) throws IOException {
 +    Credentials containerCredentials = new Credentials();
 +    containerCredentials.addAll(credentials);
 +    DataOutputBuffer containerTokens_dob = new DataOutputBuffer();
 +    containerCredentials.writeTokenStorageToStream(containerTokens_dob);
 +    return ByteBuffer.wrap(containerTokens_dob.getData(), 0, containerTokens_dob.getLength());
 +  }
 +
 +  private static class LlapRecordReaderTaskUmbilicalExternalResponder implements LlapTaskUmbilicalExternalResponder {
 +    protected LlapBaseRecordReader recordReader = null;
 +    protected LinkedBlockingQueue<ReaderEvent> queuedEvents = new LinkedBlockingQueue<ReaderEvent>();
 +
 +    public LlapRecordReaderTaskUmbilicalExternalResponder() {
 +    }
 +
 +    @Override
 +    public void submissionFailed(String fragmentId, Throwable throwable) {
 +      try {
 +        sendOrQueueEvent(ReaderEvent.errorEvent(
 +            "Received submission failed event for fragment ID " + fragmentId));
 +      } catch (Exception err) {
 +        LOG.error("Error during heartbeat responder:", err);
 +      }
 +    }
 +
 +    @Override
 +    public void heartbeat(TezHeartbeatRequest request) {
 +      TezTaskAttemptID taskAttemptId = request.getCurrentTaskAttemptID();
 +      List<TezEvent> inEvents = request.getEvents();
 +      for (TezEvent tezEvent : ListUtils.emptyIfNull(inEvents)) {
 +        EventType eventType = tezEvent.getEventType();
 +        try {
 +          switch (eventType) {
 +            case TASK_ATTEMPT_COMPLETED_EVENT:
 +              sendOrQueueEvent(ReaderEvent.doneEvent());
 +              break;
 +            case TASK_ATTEMPT_FAILED_EVENT:
 +              TaskAttemptFailedEvent taskFailedEvent = (TaskAttemptFailedEvent) tezEvent.getEvent();
 +              sendOrQueueEvent(ReaderEvent.errorEvent(taskFailedEvent.getDiagnostics()));
 +              break;
 +            case TASK_STATUS_UPDATE_EVENT:
 +              // If we want to handle counters
 +              break;
 +            default:
 +              LOG.warn("Unhandled event type " + eventType);
 +              break;
 +          }
 +        } catch (Exception err) {
 +          LOG.error("Error during heartbeat responder:", err);
 +        }
 +      }
 +    }
 +
 +    @Override
 +    public void taskKilled(TezTaskAttemptID taskAttemptId) {
 +      try {
 +        sendOrQueueEvent(ReaderEvent.errorEvent(
 +            "Received task killed event for task ID " + taskAttemptId));
 +      } catch (Exception err) {
 +        LOG.error("Error during heartbeat responder:", err);
 +      }
 +    }
 +
 +    @Override
 +    public void heartbeatTimeout(String taskAttemptId) {
 +      try {
 +        sendOrQueueEvent(ReaderEvent.errorEvent(
 +            "Timed out waiting for heartbeat for task ID " + taskAttemptId));
 +      } catch (Exception err) {
 +        LOG.error("Error during heartbeat responder:", err);
 +      }
 +    }
 +
 +    public synchronized LlapBaseRecordReader getRecordReader() {
 +      return recordReader;
 +    }
 +
 +    public synchronized void setRecordReader(LlapBaseRecordReader recordReader) {
 +      this.recordReader = recordReader;
 +
 +      if (recordReader == null) {
 +        return;
 +      }
 +
 +      // If any events were queued by the responder, give them to the record reader now.
 +      while (!queuedEvents.isEmpty()) {
 +        ReaderEvent readerEvent = queuedEvents.poll();
 +        LOG.debug("Sending queued event to record reader: " + readerEvent.getEventType());
 +        recordReader.handleEvent(readerEvent);
 +      }
 +    }
 +
 +    /**
 +     * Send the ReaderEvents to the record reader, if it is registered to this responder.
 +     * If there is no registered record reader, add them to a list of pending reader events
 +     * since we don't want to drop these events.
 +     * @param readerEvent
 +     */
 +    protected synchronized void sendOrQueueEvent(ReaderEvent readerEvent) {
 +      LlapBaseRecordReader recordReader = getRecordReader();
 +      if (recordReader != null) {
 +        recordReader.handleEvent(readerEvent);
 +      } else {
 +        if (LOG.isDebugEnabled()) {
 +          LOG.debug("No registered record reader, queueing event " + readerEvent.getEventType()
 +              + " with message " + readerEvent.getMessage());
 +        }
 +
 +        try {
 +          queuedEvents.put(readerEvent);
 +        } catch (Exception err) {
 +          throw new RuntimeException("Unexpected exception while queueing reader event", err);
 +        }
 +      }
 +    }
 +
 +    /**
 +     * Clear the list of queued reader events if we are not interested in sending any pending events to any registering record reader.
 +     */
 +    public void clearQueuedEvents() {
 +      queuedEvents.clear();
 +    }
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/hive/blob/e0579097/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
----------------------------------------------------------------------
diff --cc llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
index d8367ce,2bfe3ed..2524dc2
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
@@@ -263,13 -267,12 +267,12 @@@ public class ContainerRunnerImpl extend
          new QueryIdentifier(request.getQueryIdentifier().getAppIdentifier(),
              request.getQueryIdentifier().getDagIdentifier());
      LOG.info("Processing queryComplete notification for {}", queryIdentifier);
-     List<QueryFragmentInfo> knownFragments =
-         queryTracker
-             .queryComplete(queryIdentifier, request.getDeleteDelay());
-     LOG.info("Pending fragment count for completed query {} = {}", queryIdentifier,
+     List<QueryFragmentInfo> knownFragments = queryTracker.queryComplete(
+         queryIdentifier, request.getDeleteDelay(), false);
+     LOG.info("DBG: Pending fragment count for completed query {} = {}", queryIdentifier,
          knownFragments.size());
      for (QueryFragmentInfo fragmentInfo : knownFragments) {
 -      LOG.info("DBG: Issuing killFragment for completed query {} {}", queryIdentifier,
 +      LOG.info("Issuing killFragment for completed query {} {}", queryIdentifier,
            fragmentInfo.getFragmentIdentifierString());
        executorService.killFragment(fragmentInfo.getFragmentIdentifierString());
      }

http://git-wip-us.apache.org/repos/asf/hive/blob/e0579097/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/e0579097/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
----------------------------------------------------------------------
diff --cc llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
index 4a33373,3093de7..8594ee1
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
@@@ -134,15 -135,20 +135,18 @@@ public class TaskRunnerCallable extend
      this.memoryAvailable = memoryAvailable;
      this.confParams = confParams;
      this.jobToken = TokenCache.getSessionToken(credentials);
-     this.taskSpec = Converters.getTaskSpecfromProto(request.getFragmentSpec());
+     // TODO: support binary spec here or above
+     this.vertex = request.getWorkSpec().getVertex();
+     this.taskSpec = Converters.getTaskSpecfromProto(
+         vertex, request.getFragmentNumber(), request.getAttemptNumber(), attemptId);
      this.amReporter = amReporter;
      // Register with the AMReporter when the callable is setup. Unregister once it starts running.
 -    if (jobToken != null) {
      this.amReporter.registerTask(request.getAmHost(), request.getAmPort(),
-         request.getUser(), jobToken, fragmentInfo.getQueryInfo().getQueryIdentifier());
+         vertex.getUser(), jobToken, fragmentInfo.getQueryInfo().getQueryIdentifier());
 -    }
      this.metrics = metrics;
-     this.requestId = request.getFragmentSpec().getFragmentIdentifierString();
+     this.requestId = taskSpec.getTaskAttemptID().toString();
      // TODO Change this to the queryId/Name when that's available.
-     this.queryId = request.getFragmentSpec().getDagName();
+     this.queryId = vertex.getDagName();
      this.killedTaskHandler = killedTaskHandler;
      this.fragmentCompletionHanler = fragmentCompleteHandler;
      this.tezHadoopShim = tezHadoopShim;

http://git-wip-us.apache.org/repos/asf/hive/blob/e0579097/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
----------------------------------------------------------------------