You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2016/05/04 09:22:07 UTC
[04/10] hive git commit: HIVE-13442 : LLAP: refactor submit API to be
amenable to signing (Sergey Shelukhin, reviewed by Siddharth Seth)
http://git-wip-us.apache.org/repos/asf/hive/blob/0b5c27fd/llap-common/src/java/org/apache/hadoop/hive/llap/tez/Converters.java
----------------------------------------------------------------------
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/tez/Converters.java b/llap-common/src/java/org/apache/hadoop/hive/llap/tez/Converters.java
index ec6e439..e43b72b 100644
--- a/llap-common/src/java/org/apache/hadoop/hive/llap/tez/Converters.java
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/tez/Converters.java
@@ -22,9 +22,11 @@ import com.google.protobuf.ByteString;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.EntityDescriptorProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GroupInputSpecProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecProto;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.UserPayloadProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.VertexIdentifier;
+import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.dag.api.EntityDescriptor;
import org.apache.tez.dag.api.InputDescriptor;
@@ -33,7 +35,10 @@ import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.runtime.api.impl.GroupInputSpec;
import org.apache.tez.runtime.api.impl.InputSpec;
import org.apache.tez.runtime.api.impl.OutputSpec;
@@ -41,55 +46,88 @@ import org.apache.tez.runtime.api.impl.TaskSpec;
public class Converters {
- public static TaskSpec getTaskSpecfromProto(FragmentSpecProto FragmentSpecProto) {
- TezTaskAttemptID taskAttemptID =
- TezTaskAttemptID.fromString(FragmentSpecProto.getFragmentIdentifierString());
+ public static TaskSpec getTaskSpecfromProto(SignableVertexSpec vectorProto,
+ int fragmentNum, int attemptNum, TezTaskAttemptID attemptId) {
+ VertexIdentifier vertexId = vectorProto.getVertexIdentifier();
+ TezTaskAttemptID taskAttemptID = attemptId != null ? attemptId
+ : createTaskAttemptId(vertexId, fragmentNum, attemptNum);
ProcessorDescriptor processorDescriptor = null;
- if (FragmentSpecProto.hasProcessorDescriptor()) {
+ if (vectorProto.hasProcessorDescriptor()) {
processorDescriptor = convertProcessorDescriptorFromProto(
- FragmentSpecProto.getProcessorDescriptor());
+ vectorProto.getProcessorDescriptor());
}
- List<InputSpec> inputSpecList = new ArrayList<InputSpec>(FragmentSpecProto.getInputSpecsCount());
- if (FragmentSpecProto.getInputSpecsCount() > 0) {
- for (IOSpecProto inputSpecProto : FragmentSpecProto.getInputSpecsList()) {
+ List<InputSpec> inputSpecList = new ArrayList<InputSpec>(vectorProto.getInputSpecsCount());
+ if (vectorProto.getInputSpecsCount() > 0) {
+ for (IOSpecProto inputSpecProto : vectorProto.getInputSpecsList()) {
inputSpecList.add(getInputSpecFromProto(inputSpecProto));
}
}
List<OutputSpec> outputSpecList =
- new ArrayList<OutputSpec>(FragmentSpecProto.getOutputSpecsCount());
- if (FragmentSpecProto.getOutputSpecsCount() > 0) {
- for (IOSpecProto outputSpecProto : FragmentSpecProto.getOutputSpecsList()) {
+ new ArrayList<OutputSpec>(vectorProto.getOutputSpecsCount());
+ if (vectorProto.getOutputSpecsCount() > 0) {
+ for (IOSpecProto outputSpecProto : vectorProto.getOutputSpecsList()) {
outputSpecList.add(getOutputSpecFromProto(outputSpecProto));
}
}
List<GroupInputSpec> groupInputSpecs =
- new ArrayList<GroupInputSpec>(FragmentSpecProto.getGroupedInputSpecsCount());
- if (FragmentSpecProto.getGroupedInputSpecsCount() > 0) {
- for (GroupInputSpecProto groupInputSpecProto : FragmentSpecProto.getGroupedInputSpecsList()) {
+ new ArrayList<GroupInputSpec>(vectorProto.getGroupedInputSpecsCount());
+ if (vectorProto.getGroupedInputSpecsCount() > 0) {
+ for (GroupInputSpecProto groupInputSpecProto : vectorProto.getGroupedInputSpecsList()) {
groupInputSpecs.add(getGroupInputSpecFromProto(groupInputSpecProto));
}
}
TaskSpec taskSpec =
- new TaskSpec(taskAttemptID, FragmentSpecProto.getDagName(), FragmentSpecProto.getVertexName(),
- FragmentSpecProto.getVertexParallelism(), processorDescriptor, inputSpecList,
+ new TaskSpec(taskAttemptID, vectorProto.getDagName(), vectorProto.getVertexName(),
+ vectorProto.getVertexParallelism(), processorDescriptor, inputSpecList,
outputSpecList, groupInputSpecs);
return taskSpec;
}
- public static FragmentSpecProto convertTaskSpecToProto(TaskSpec taskSpec) {
- FragmentSpecProto.Builder builder = FragmentSpecProto.newBuilder();
- builder.setFragmentIdentifierString(taskSpec.getTaskAttemptID().toString());
+ public static TezTaskAttemptID createTaskAttemptId(
+ VertexIdentifier vertexId, int fragmentNum, int attemptNum) {
+ // Come ride the API roller-coaster!
+ return TezTaskAttemptID.getInstance(
+ TezTaskID.getInstance(
+ TezVertexID.getInstance(
+ TezDAGID.getInstance(
+ ConverterUtils.toApplicationId(
+ vertexId.getApplicationIdString()),
+ vertexId.getDagId()),
+ vertexId.getVertexId()),
+ fragmentNum),
+ attemptNum);
+ }
+
+ public static VertexIdentifier createVertexIdentifier(
+ TezTaskAttemptID taId, int appAttemptId) {
+ VertexIdentifier.Builder idBuilder = VertexIdentifier.newBuilder();
+ idBuilder.setApplicationIdString(
+ taId.getTaskID().getVertexID().getDAGId().getApplicationId().toString());
+ idBuilder.setAppAttemptNumber(appAttemptId);
+ idBuilder.setDagId(taId.getTaskID().getVertexID().getDAGId().getId());
+ idBuilder.setVertexId(taId.getTaskID().getVertexID().getId());
+ return idBuilder.build();
+ }
+
+ public static SignableVertexSpec convertTaskSpecToProto(TaskSpec taskSpec,
+ int appAttemptId, String tokenIdentifier, Integer signatureKeyId, String user) {
+ TezTaskAttemptID tId = taskSpec.getTaskAttemptID();
+
+ SignableVertexSpec.Builder builder = SignableVertexSpec.newBuilder();
+ builder.setVertexIdentifier(createVertexIdentifier(tId, appAttemptId));
builder.setDagName(taskSpec.getDAGName());
- builder.setDagId(taskSpec.getDagIdentifier());
builder.setVertexName(taskSpec.getVertexName());
builder.setVertexParallelism(taskSpec.getVertexParallelism());
- builder.setFragmentNumber(taskSpec.getTaskAttemptID().getTaskID().getId());
- builder.setAttemptNumber(taskSpec.getTaskAttemptID().getId());
+ builder.setTokenIdentifier(tokenIdentifier);
+ builder.setUser(user);
+ if (signatureKeyId != null) {
+ builder.setSignatureKeyId(signatureKeyId);
+ }
if (taskSpec.getProcessorDescriptor() != null) {
builder.setProcessorDescriptor(
http://git-wip-us.apache.org/repos/asf/hive/blob/0b5c27fd/llap-common/src/protobuf/LlapDaemonProtocol.proto
----------------------------------------------------------------------
diff --git a/llap-common/src/protobuf/LlapDaemonProtocol.proto b/llap-common/src/protobuf/LlapDaemonProtocol.proto
index 5cdc02e..486ba0a 100644
--- a/llap-common/src/protobuf/LlapDaemonProtocol.proto
+++ b/llap-common/src/protobuf/LlapDaemonProtocol.proto
@@ -46,19 +46,38 @@ message GroupInputSpecProto {
optional EntityDescriptorProto merged_input_descriptor = 3;
}
+message VertexIdentifier {
+ optional string application_id_string = 1;
+ optional int32 app_attempt_number = 2;
+ optional int32 dag_id = 3;
+ optional int32 vertex_id = 4;
+}
+
+// The part of SubmitWork that can be signed
+message SignableVertexSpec
+{
+ optional string user = 1;
+ optional int64 signatureKeyId = 2;
+
+ optional VertexIdentifier vertexIdentifier = 3;
+ // Display names cannot be modified by the client for now. If needed, they should be sent to HS2 who will put them here.
+ optional string dag_name = 4;
+ optional string vertex_name = 5;
+
+ // The core vertex stuff
+ optional string token_identifier = 6;
+ optional EntityDescriptorProto processor_descriptor = 7;
+ repeated IOSpecProto input_specs = 8;
+ repeated IOSpecProto output_specs = 9;
+ repeated GroupInputSpecProto grouped_input_specs = 10;
+
+ optional int32 vertex_parallelism = 11; // An internal field required for Tez.
+}
-message FragmentSpecProto {
- optional string fragment_identifier_string = 1;
- optional string dag_name = 2;
- optional int32 dag_id = 11;
- optional string vertex_name = 3;
- optional EntityDescriptorProto processor_descriptor = 4;
- repeated IOSpecProto input_specs = 5;
- repeated IOSpecProto output_specs = 6;
- repeated GroupInputSpecProto grouped_input_specs = 7;
- optional int32 vertex_parallelism = 8;
- optional int32 fragment_number =9;
- optional int32 attempt_number = 10;
+// Union
+message VertexOrBinary {
+ optional SignableVertexSpec vertex = 1;
+ optional bytes vertexBinary = 2; // SignableVertexSpec
}
message FragmentRuntimeInfo {
@@ -81,18 +100,24 @@ message QueryIdentifierProto {
}
message SubmitWorkRequestProto {
- optional string container_id_string = 1;
- optional string am_host = 2;
- optional int32 am_port = 3;
- optional string token_identifier = 4;
- optional bytes credentials_binary = 5;
- optional string user = 6;
- optional string application_id_string = 7;
- optional int32 app_attempt_number = 8;
- optional FragmentSpecProto fragment_spec = 9;
- optional FragmentRuntimeInfo fragment_runtime_info = 10;
+ optional VertexOrBinary work_spec = 1;
+ optional bytes work_spec_signature = 2;
+
+ optional int32 fragment_number = 3;
+ optional int32 attempt_number = 4;
+
+ optional string container_id_string = 5;
+ optional string am_host = 6;
+ optional int32 am_port = 7;
+
+ // Credentials are not signed - the client can add e.g. his own HDFS tokens.
+ optional bytes credentials_binary = 8;
+
+ // Not supported/honored for external clients right now.
+ optional FragmentRuntimeInfo fragment_runtime_info = 9;
}
+
enum SubmissionStateProto {
ACCEPTED = 1;
REJECTED = 2;
http://git-wip-us.apache.org/repos/asf/hive/blob/0b5c27fd/llap-common/src/test/org/apache/hadoop/hive/llap/tez/TestConverters.java
----------------------------------------------------------------------
diff --git a/llap-common/src/test/org/apache/hadoop/hive/llap/tez/TestConverters.java b/llap-common/src/test/org/apache/hadoop/hive/llap/tez/TestConverters.java
index d4cdac1..349ee14 100644
--- a/llap-common/src/test/org/apache/hadoop/hive/llap/tez/TestConverters.java
+++ b/llap-common/src/test/org/apache/hadoop/hive/llap/tez/TestConverters.java
@@ -23,8 +23,8 @@ import java.util.List;
import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.EntityDescriptorProto;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.UserPayloadProto;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.tez.dag.api.InputDescriptor;
@@ -77,28 +77,24 @@ public class TestConverters {
new TaskSpec(tezTaskAttemptId, "dagName", "vertexName", 10, processorDescriptor,
inputSpecList, outputSpecList, null);
+ SignableVertexSpec vertexProto = Converters.convertTaskSpecToProto(taskSpec, 0, "", null, "");
- FragmentSpecProto fragmentSpecProto = Converters.convertTaskSpecToProto(taskSpec);
-
-
- assertEquals("dagName", fragmentSpecProto.getDagName());
- assertEquals("vertexName", fragmentSpecProto.getVertexName());
- assertEquals(tezTaskAttemptId.toString(), fragmentSpecProto.getFragmentIdentifierString());
- assertEquals(tezDagId.getId(), fragmentSpecProto.getDagId());
- assertEquals(tezTaskAttemptId.getId(), fragmentSpecProto.getAttemptNumber());
- assertEquals(tezTaskId.getId(), fragmentSpecProto.getFragmentNumber());
+ assertEquals("dagName", vertexProto.getDagName());
+ assertEquals("vertexName", vertexProto.getVertexName());
+ assertEquals(appId.toString(), vertexProto.getVertexIdentifier().getApplicationIdString());
+ assertEquals(tezDagId.getId(), vertexProto.getVertexIdentifier().getDagId());
assertEquals(processorDescriptor.getClassName(),
- fragmentSpecProto.getProcessorDescriptor().getClassName());
+ vertexProto.getProcessorDescriptor().getClassName());
assertEquals(processorDescriptor.getUserPayload().getPayload(),
- fragmentSpecProto.getProcessorDescriptor().getUserPayload().getUserPayload()
+ vertexProto.getProcessorDescriptor().getUserPayload().getUserPayload()
.asReadOnlyByteBuffer());
- assertEquals(2, fragmentSpecProto.getInputSpecsCount());
- assertEquals(2, fragmentSpecProto.getOutputSpecsCount());
+ assertEquals(2, vertexProto.getInputSpecsCount());
+ assertEquals(2, vertexProto.getOutputSpecsCount());
- verifyInputSpecAndProto(inputSpec1, fragmentSpecProto.getInputSpecs(0));
- verifyInputSpecAndProto(inputSpec2, fragmentSpecProto.getInputSpecs(1));
- verifyOutputSpecAndProto(outputSpec1, fragmentSpecProto.getOutputSpecs(0));
- verifyOutputSpecAndProto(outputSpec2, fragmentSpecProto.getOutputSpecs(1));
+ verifyInputSpecAndProto(inputSpec1, vertexProto.getInputSpecs(0));
+ verifyInputSpecAndProto(inputSpec2, vertexProto.getInputSpecs(1));
+ verifyOutputSpecAndProto(outputSpec1, vertexProto.getOutputSpecs(0));
+ verifyOutputSpecAndProto(outputSpec2, vertexProto.getOutputSpecs(1));
}
@@ -120,11 +116,10 @@ public class TestConverters {
TezTaskID tezTaskId = TezTaskID.getInstance(tezVertexId, 500);
TezTaskAttemptID tezTaskAttemptId = TezTaskAttemptID.getInstance(tezTaskId, 600);
- FragmentSpecProto.Builder builder = FragmentSpecProto.newBuilder();
- builder.setFragmentIdentifierString(tezTaskAttemptId.toString());
+ SignableVertexSpec.Builder builder = SignableVertexSpec.newBuilder();
+ builder.setVertexIdentifier(Converters.createVertexIdentifier(tezTaskAttemptId, 0));
builder.setDagName("dagName");
builder.setVertexName("vertexName");
- builder.setDagId(tezDagId.getId());
builder.setProcessorDescriptor(
EntityDescriptorProto.newBuilder().setClassName("fakeProcessorName").setUserPayload(
UserPayloadProto.newBuilder().setUserPayload(ByteString.copyFrom(procBb))));
@@ -145,9 +140,9 @@ public class TestConverters {
EntityDescriptorProto.newBuilder().setClassName("outputClassName").setUserPayload(
UserPayloadProto.newBuilder().setUserPayload(ByteString.copyFrom(output1Bb)))));
- FragmentSpecProto fragmentSpecProto = builder.build();
+ SignableVertexSpec vertexProto = builder.build();
- TaskSpec taskSpec = Converters.getTaskSpecfromProto(fragmentSpecProto);
+ TaskSpec taskSpec = Converters.getTaskSpecfromProto(vertexProto, 0, 0, null);
assertEquals("dagName", taskSpec.getDAGName());
assertEquals("vertexName", taskSpec.getVertexName());
@@ -160,12 +155,10 @@ public class TestConverters {
assertEquals(2, taskSpec.getInputs().size());
assertEquals(2, taskSpec.getOutputs().size());
- verifyInputSpecAndProto(taskSpec.getInputs().get(0), fragmentSpecProto.getInputSpecs(0));
- verifyInputSpecAndProto(taskSpec.getInputs().get(1), fragmentSpecProto.getInputSpecs(1));
- verifyOutputSpecAndProto(taskSpec.getOutputs().get(0), fragmentSpecProto.getOutputSpecs(0));
- verifyOutputSpecAndProto(taskSpec.getOutputs().get(1), fragmentSpecProto.getOutputSpecs(1));
-
-
+ verifyInputSpecAndProto(taskSpec.getInputs().get(0), vertexProto.getInputSpecs(0));
+ verifyInputSpecAndProto(taskSpec.getInputs().get(1), vertexProto.getInputSpecs(1));
+ verifyOutputSpecAndProto(taskSpec.getOutputs().get(0), vertexProto.getOutputSpecs(0));
+ verifyOutputSpecAndProto(taskSpec.getOutputs().get(1), vertexProto.getOutputSpecs(1));
}
private void verifyInputSpecAndProto(InputSpec inputSpec,
http://git-wip-us.apache.org/repos/asf/hive/blob/0b5c27fd/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
index 78b37f7..2bfe3ed 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
@@ -33,11 +33,11 @@ import org.apache.hadoop.hive.llap.daemon.HistoryLogger;
import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler;
import org.apache.hadoop.hive.llap.daemon.QueryFailedHandler;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GroupInputSpecProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteResponseProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmissionStateProto;
@@ -45,7 +45,9 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWor
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentResponseProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.VertexIdentifier;
import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics;
+import org.apache.hadoop.hive.llap.tez.Converters;
import org.apache.hadoop.hive.ql.exec.tez.TezProcessor;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.security.Credentials;
@@ -151,32 +153,35 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
@Override
public SubmitWorkResponseProto submitWork(SubmitWorkRequestProto request) throws IOException {
- HistoryLogger.logFragmentStart(request.getApplicationIdString(), request.getContainerIdString(),
- localAddress.get().getHostName(), request.getFragmentSpec().getDagName(), request.getFragmentSpec().getDagId(),
- request.getFragmentSpec().getVertexName(), request.getFragmentSpec().getFragmentNumber(),
- request.getFragmentSpec().getAttemptNumber());
+ // TODO: also support binary. Actually, we should figure out the binary stuff here and
+ // stop passing the protobuf around. We should pass around some plain objects/values.
+ SignableVertexSpec vertex = request.getWorkSpec().getVertex();
if (LOG.isInfoEnabled()) {
- LOG.info("Queueing container for execution: " + stringifySubmitRequest(request));
+ LOG.info("Queueing container for execution: " + stringifySubmitRequest(request, vertex));
}
+ VertexIdentifier vId = vertex.getVertexIdentifier();
+ TezTaskAttemptID attemptId = Converters.createTaskAttemptId(
+ vId, request.getFragmentNumber(), request.getAttemptNumber());
+ String fragmentIdString = attemptId.toString();
+ HistoryLogger.logFragmentStart(vId.getApplicationIdString(), request.getContainerIdString(),
+ localAddress.get().getHostName(), vertex.getDagName(), vId.getDagId(),
+ vertex.getVertexName(), request.getFragmentNumber(), request.getAttemptNumber());
// This is the start of container-annotated logging.
// TODO Reduce the length of this string. Way too verbose at the moment.
- String ndcContextString = request.getFragmentSpec().getFragmentIdentifierString();
- NDC.push(ndcContextString);
+ NDC.push(fragmentIdString);
Scheduler.SubmissionState submissionState;
SubmitWorkResponseProto.Builder responseBuilder = SubmitWorkResponseProto.newBuilder();
try {
Map<String, String> env = new HashMap<>();
// TODO What else is required in this environment map.
env.putAll(localEnv);
- env.put(ApplicationConstants.Environment.USER.name(), request.getUser());
+ env.put(ApplicationConstants.Environment.USER.name(), vertex.getUser());
- FragmentSpecProto fragmentSpec = request.getFragmentSpec();
- TezTaskAttemptID taskAttemptId = TezTaskAttemptID.fromString(
- fragmentSpec.getFragmentIdentifierString());
+ TezTaskAttemptID taskAttemptId = TezTaskAttemptID.fromString(fragmentIdString);
int dagIdentifier = taskAttemptId.getTaskID().getVertexID().getDAGId().getId();
QueryIdentifier queryIdentifier = new QueryIdentifier(
- request.getApplicationIdString(), dagIdentifier);
+ vId.getApplicationIdString(), dagIdentifier);
Credentials credentials = new Credentials();
DataInputBuffer dib = new DataInputBuffer();
@@ -186,14 +191,10 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
Token<JobTokenIdentifier> jobToken = TokenCache.getSessionToken(credentials);
- QueryFragmentInfo fragmentInfo = queryTracker
- .registerFragment(queryIdentifier, request.getApplicationIdString(),
- fragmentSpec.getDagName(),
- dagIdentifier,
- fragmentSpec.getVertexName(), fragmentSpec.getFragmentNumber(),
- fragmentSpec.getAttemptNumber(), request.getUser(), request.getFragmentSpec(),
- jobToken);
-
+ QueryFragmentInfo fragmentInfo = queryTracker.registerFragment(
+ queryIdentifier, vId.getApplicationIdString(), vertex.getDagName(), dagIdentifier,
+ vertex.getVertexName(), request.getFragmentNumber(), request.getAttemptNumber(),
+ vertex.getUser(), vertex, jobToken, fragmentIdString);
String[] localDirs = fragmentInfo.getLocalDirs();
Preconditions.checkNotNull(localDirs);
@@ -202,14 +203,16 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
}
// May need to setup localDir for re-localization, which is usually setup as Environment.PWD.
// Used for re-localization, to add the user specified configuration (conf_pb_binary_stream)
- TaskRunnerCallable callable = new TaskRunnerCallable(request, fragmentInfo, new Configuration(getConfig()),
+
+ Configuration callableConf = new Configuration(getConfig());
+ TaskRunnerCallable callable = new TaskRunnerCallable(request, fragmentInfo, callableConf,
new LlapExecutionContext(localAddress.get().getHostName(), queryTracker), env,
credentials, memoryPerExecutor, amReporter, confParams, metrics, killedTaskHandler,
- this, tezHadoopShim);
+ this, tezHadoopShim, attemptId);
submissionState = executorService.schedule(callable);
if (LOG.isInfoEnabled()) {
- LOG.info("SubmissionState for {} : {} ", ndcContextString, submissionState);
+ LOG.info("SubmissionState for {} : {} ", fragmentIdString, submissionState);
}
if (submissionState.equals(Scheduler.SubmissionState.REJECTED)) {
@@ -300,24 +303,25 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
return sb.toString();
}
- public static String stringifySubmitRequest(SubmitWorkRequestProto request) {
+ public static String stringifySubmitRequest(
+ SubmitWorkRequestProto request, SignableVertexSpec vertex) {
StringBuilder sb = new StringBuilder();
- FragmentSpecProto fragmentSpec = request.getFragmentSpec();
sb.append("am_details=").append(request.getAmHost()).append(":").append(request.getAmPort());
- sb.append(", taskInfo=").append(fragmentSpec.getFragmentIdentifierString());
- sb.append(", user=").append(request.getUser());
- sb.append(", appIdString=").append(request.getApplicationIdString());
- sb.append(", appAttemptNum=").append(request.getAppAttemptNumber());
+ sb.append(", taskInfo=").append(vertex.getVertexIdentifier()).append(" fragment ")
+ .append(request.getFragmentNumber()).append(" attempt ").append(request.getAttemptNumber());
+ sb.append(", user=").append(vertex.getUser());
+ sb.append(", appIdString=").append(vertex.getVertexIdentifier().getApplicationIdString());
+ sb.append(", appAttemptNum=").append(vertex.getVertexIdentifier().getAppAttemptNumber());
sb.append(", containerIdString=").append(request.getContainerIdString());
- sb.append(", dagName=").append(fragmentSpec.getDagName());
- sb.append(", vertexName=").append(fragmentSpec.getVertexName());
- sb.append(", processor=").append(fragmentSpec.getProcessorDescriptor().getClassName());
- sb.append(", numInputs=").append(fragmentSpec.getInputSpecsCount());
- sb.append(", numOutputs=").append(fragmentSpec.getOutputSpecsCount());
- sb.append(", numGroupedInputs=").append(fragmentSpec.getGroupedInputSpecsCount());
+ sb.append(", dagName=").append(vertex.getDagName());
+ sb.append(", vertexName=").append(vertex.getVertexName());
+ sb.append(", processor=").append(vertex.getProcessorDescriptor().getClassName());
+ sb.append(", numInputs=").append(vertex.getInputSpecsCount());
+ sb.append(", numOutputs=").append(vertex.getOutputSpecsCount());
+ sb.append(", numGroupedInputs=").append(vertex.getGroupedInputSpecsCount());
sb.append(", Inputs={");
- if (fragmentSpec.getInputSpecsCount() > 0) {
- for (IOSpecProto ioSpec : fragmentSpec.getInputSpecsList()) {
+ if (vertex.getInputSpecsCount() > 0) {
+ for (IOSpecProto ioSpec : vertex.getInputSpecsList()) {
sb.append("{").append(ioSpec.getConnectedVertexName()).append(",")
.append(ioSpec.getIoDescriptor().getClassName()).append(",")
.append(ioSpec.getPhysicalEdgeCount()).append("}");
@@ -325,8 +329,8 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
}
sb.append("}");
sb.append(", Outputs={");
- if (fragmentSpec.getOutputSpecsCount() > 0) {
- for (IOSpecProto ioSpec : fragmentSpec.getOutputSpecsList()) {
+ if (vertex.getOutputSpecsCount() > 0) {
+ for (IOSpecProto ioSpec : vertex.getOutputSpecsList()) {
sb.append("{").append(ioSpec.getConnectedVertexName()).append(",")
.append(ioSpec.getIoDescriptor().getClassName()).append(",")
.append(ioSpec.getPhysicalEdgeCount()).append("}");
@@ -334,8 +338,8 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
}
sb.append("}");
sb.append(", GroupedInputs={");
- if (fragmentSpec.getGroupedInputSpecsCount() > 0) {
- for (GroupInputSpecProto group : fragmentSpec.getGroupedInputSpecsList()) {
+ if (vertex.getGroupedInputSpecsCount() > 0) {
+ for (GroupInputSpecProto group : vertex.getGroupedInputSpecsList()) {
sb.append("{").append("groupName=").append(group.getGroupName()).append(", elements=")
.append(group.getGroupVerticesList()).append("}");
sb.append(group.getGroupVerticesList());
http://git-wip-us.apache.org/repos/asf/hive/blob/0b5c27fd/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFragmentInfo.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFragmentInfo.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFragmentInfo.java
index 480a394..195775e 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFragmentInfo.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFragmentInfo.java
@@ -21,8 +21,8 @@ import java.util.List;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hive.llap.daemon.FinishableStateUpdateHandler;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec;
import org.apache.hadoop.hive.llap.tezplugins.LlapTezUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,19 +35,20 @@ public class QueryFragmentInfo {
private final String vertexName;
private final int fragmentNumber;
private final int attemptNumber;
- private final FragmentSpecProto fragmentSpec;
+ private final SignableVertexSpec vertexSpec;
+ private final String fragmentIdString;
public QueryFragmentInfo(QueryInfo queryInfo, String vertexName, int fragmentNumber,
- int attemptNumber,
- FragmentSpecProto fragmentSpec) {
+ int attemptNumber, SignableVertexSpec vertexSpec, String fragmentIdString) {
Preconditions.checkNotNull(queryInfo);
Preconditions.checkNotNull(vertexName);
- Preconditions.checkNotNull(fragmentSpec);
+ Preconditions.checkNotNull(vertexSpec);
this.queryInfo = queryInfo;
this.vertexName = vertexName;
this.fragmentNumber = fragmentNumber;
this.attemptNumber = attemptNumber;
- this.fragmentSpec = fragmentSpec;
+ this.vertexSpec = vertexSpec;
+ this.fragmentIdString = fragmentIdString;
}
// Only meant for use by the QueryTracker
@@ -55,8 +56,8 @@ public class QueryFragmentInfo {
return this.queryInfo;
}
- public FragmentSpecProto getFragmentSpec() {
- return fragmentSpec;
+ public SignableVertexSpec getVertexSpec() {
+ return vertexSpec;
}
public String getVertexName() {
@@ -72,7 +73,7 @@ public class QueryFragmentInfo {
}
public String getFragmentIdentifierString() {
- return fragmentSpec.getFragmentIdentifierString();
+ return fragmentIdString;
}
/**
@@ -85,7 +86,7 @@ public class QueryFragmentInfo {
* @return true if the task can finish, false otherwise
*/
public boolean canFinish() {
- List<IOSpecProto> inputSpecList = fragmentSpec.getInputSpecsList();
+ List<IOSpecProto> inputSpecList = vertexSpec.getInputSpecsList();
boolean canFinish = true;
if (inputSpecList != null && !inputSpecList.isEmpty()) {
for (IOSpecProto inputSpec : inputSpecList) {
@@ -126,7 +127,7 @@ public class QueryFragmentInfo {
public boolean registerForFinishableStateUpdates(FinishableStateUpdateHandler handler,
boolean lastFinishableState) {
List<String> sourcesOfInterest = new LinkedList<>();
- List<IOSpecProto> inputSpecList = fragmentSpec.getInputSpecsList();
+ List<IOSpecProto> inputSpecList = vertexSpec.getInputSpecsList();
if (inputSpecList != null && !inputSpecList.isEmpty()) {
for (IOSpecProto inputSpec : inputSpecList) {
if (LlapTezUtils.isSourceOfInterest(inputSpec.getIoDescriptor().getClassName())) {
http://git-wip-us.apache.org/repos/asf/hive/blob/0b5c27fd/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
index 8daef9e..6914134 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
@@ -35,7 +35,7 @@ import com.google.common.collect.Multimap;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.llap.daemon.FinishableStateUpdateHandler;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto;
public class QueryInfo {
@@ -92,9 +92,10 @@ public class QueryInfo {
return sourceStateMap;
}
- public QueryFragmentInfo registerFragment(String vertexName, int fragmentNumber, int attemptNumber, FragmentSpecProto fragmentSpec) {
- QueryFragmentInfo fragmentInfo = new QueryFragmentInfo(this, vertexName, fragmentNumber, attemptNumber,
- fragmentSpec);
+ public QueryFragmentInfo registerFragment(String vertexName, int fragmentNumber,
+ int attemptNumber, SignableVertexSpec vertexSpec, String fragmentIdString) {
+ QueryFragmentInfo fragmentInfo = new QueryFragmentInfo(
+ this, vertexName, fragmentNumber, attemptNumber, vertexSpec, fragmentIdString);
knownFragments.add(fragmentInfo);
return fragmentInfo;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/0b5c27fd/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
index cb3be2b..8abd198 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto;
import org.apache.hadoop.hive.llap.shufflehandler.ShuffleHandler;
import org.apache.hadoop.hive.ql.exec.ObjectCacheFactory;
@@ -113,20 +113,11 @@ public class QueryTracker extends AbstractService {
/**
* Register a new fragment for a specific query
- * @param queryIdentifier
- * @param appIdString
- * @param dagName
- * @param dagIdentifier
- * @param vertexName
- * @param fragmentNumber
- * @param attemptNumber
- * @param user
- * @throws IOException
*/
QueryFragmentInfo registerFragment(QueryIdentifier queryIdentifier, String appIdString,
String dagName, int dagIdentifier, String vertexName, int fragmentNumber, int attemptNumber,
- String user, FragmentSpecProto fragmentSpec, Token<JobTokenIdentifier> appToken)
- throws IOException {
+ String user, SignableVertexSpec vertex, Token<JobTokenIdentifier> appToken,
+ String fragmentIdString) throws IOException {
ReadWriteLock dagLock = getDagLock(queryIdentifier);
dagLock.readLock().lock();
try {
@@ -166,7 +157,8 @@ public class QueryTracker extends AbstractService {
.registerDag(appIdString, dagIdentifier, appToken,
user, queryInfo.getLocalDirs());
- return queryInfo.registerFragment(vertexName, fragmentNumber, attemptNumber, fragmentSpec);
+ return queryInfo.registerFragment(
+ vertexName, fragmentNumber, attemptNumber, vertex, fragmentIdString);
} finally {
dagLock.readLock().unlock();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/0b5c27fd/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
index 1933eb1..eac0e8f 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
@@ -43,7 +43,7 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.hadoop.hive.llap.daemon.FinishableStateUpdateHandler;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec;
import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics;
import org.apache.hadoop.service.AbstractService;
import org.apache.tez.runtime.task.EndReason;
@@ -191,8 +191,8 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
TaskWrapper task = e.getValue();
boolean isFirst = true;
TaskRunnerCallable c = task.getTaskRunnerCallable();
- if (c != null && c.getRequest() != null && c.getRequest().getFragmentSpec() != null) {
- FragmentSpecProto fs = c.getRequest().getFragmentSpec();
+ if (c != null && c.getVertexSpec() != null) {
+ SignableVertexSpec fs = c.getVertexSpec();
value.append(isFirst ? " (" : ", ").append(fs.getDagName())
.append("/").append(fs.getVertexName());
isFirst = false;
@@ -781,7 +781,7 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
", firstAttemptStartTime=" + taskRunnerCallable.getFragmentRuntimeInfo().getFirstAttemptStartTime() +
", dagStartTime=" + taskRunnerCallable.getFragmentRuntimeInfo().getDagStartTime() +
", withinDagPriority=" + taskRunnerCallable.getFragmentRuntimeInfo().getWithinDagPriority() +
- ", vertexParallelism= " + taskRunnerCallable.getFragmentSpec().getVertexParallelism() +
+ ", vertexParallelism= " + taskRunnerCallable.getVertexSpec().getVertexParallelism() +
", selfAndUpstreamParallelism= " + taskRunnerCallable.getFragmentRuntimeInfo().getNumSelfAndUpstreamTasks() +
", selfAndUpstreamComplete= " + taskRunnerCallable.getFragmentRuntimeInfo().getNumSelfAndUpstreamCompletedTasks() +
'}';
http://git-wip-us.apache.org/repos/asf/hive/blob/0b5c27fd/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
index fcfa940..3093de7 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
@@ -33,8 +33,8 @@ import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler;
import org.apache.hadoop.hive.llap.daemon.HistoryLogger;
import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics;
import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol;
@@ -113,6 +113,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
private final AtomicBoolean isStarted = new AtomicBoolean(false);
private final AtomicBoolean isCompleted = new AtomicBoolean(false);
private final AtomicBoolean killInvoked = new AtomicBoolean(false);
+ private final SignableVertexSpec vertex;
@VisibleForTesting
public TaskRunnerCallable(SubmitWorkRequestProto request, QueryFragmentInfo fragmentInfo,
@@ -123,7 +124,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
ConfParams confParams, LlapDaemonExecutorMetrics metrics,
KilledTaskHandler killedTaskHandler,
FragmentCompletionHandler fragmentCompleteHandler,
- HadoopShim tezHadoopShim) {
+ HadoopShim tezHadoopShim, TezTaskAttemptID attemptId) {
this.request = request;
this.fragmentInfo = fragmentInfo;
this.conf = conf;
@@ -134,17 +135,20 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
this.memoryAvailable = memoryAvailable;
this.confParams = confParams;
this.jobToken = TokenCache.getSessionToken(credentials);
- this.taskSpec = Converters.getTaskSpecfromProto(request.getFragmentSpec());
+ // TODO: support binary spec here or above
+ this.vertex = request.getWorkSpec().getVertex();
+ this.taskSpec = Converters.getTaskSpecfromProto(
+ vertex, request.getFragmentNumber(), request.getAttemptNumber(), attemptId);
this.amReporter = amReporter;
// Register with the AMReporter when the callable is setup. Unregister once it starts running.
if (jobToken != null) {
this.amReporter.registerTask(request.getAmHost(), request.getAmPort(),
- request.getUser(), jobToken, fragmentInfo.getQueryInfo().getQueryIdentifier());
+ vertex.getUser(), jobToken, fragmentInfo.getQueryInfo().getQueryIdentifier());
}
this.metrics = metrics;
- this.requestId = request.getFragmentSpec().getFragmentIdentifierString();
+ this.requestId = taskSpec.getTaskAttemptID().toString();
// TODO Change this to the queryId/Name when that's available.
- this.queryId = request.getFragmentSpec().getDagName();
+ this.queryId = vertex.getDagName();
this.killedTaskHandler = killedTaskHandler;
this.fragmentCompletionHanler = fragmentCompleteHandler;
this.tezHadoopShim = tezHadoopShim;
@@ -184,16 +188,16 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
// TODO Consolidate this code with TezChild.
runtimeWatch.start();
- UserGroupInformation taskUgi = UserGroupInformation.createRemoteUser(request.getUser());
+ UserGroupInformation taskUgi = UserGroupInformation.createRemoteUser(vertex.getUser());
taskUgi.addCredentials(credentials);
Map<String, ByteBuffer> serviceConsumerMetadata = new HashMap<>();
serviceConsumerMetadata.put(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID,
TezCommonUtils.convertJobTokenToBytes(jobToken));
- Multimap<String, String> startedInputsMap = createStartedInputMap(request.getFragmentSpec());
+ Multimap<String, String> startedInputsMap = createStartedInputMap(vertex);
UserGroupInformation taskOwner =
- UserGroupInformation.createRemoteUser(request.getTokenIdentifier());
+ UserGroupInformation.createRemoteUser(vertex.getTokenIdentifier());
final InetSocketAddress address =
NetUtils.createSocketAddrForHost(request.getAmHost(), request.getAmPort());
SecurityUtil.setTokenService(jobToken, address);
@@ -228,7 +232,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
if (shouldRunTask) {
taskRunner = new TezTaskRunner2(conf, taskUgi, fragmentInfo.getLocalDirs(),
taskSpec,
- request.getAppAttemptNumber(),
+ vertex.getVertexIdentifier().getAppAttemptNumber(),
serviceConsumerMetadata, envMap, startedInputsMap, taskReporter, executor,
objectRegistry,
pid,
@@ -313,7 +317,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
*/
public void reportTaskKilled() {
killedTaskHandler
- .taskKilled(request.getAmHost(), request.getAmPort(), request.getUser(), jobToken,
+ .taskKilled(request.getAmHost(), request.getAmPort(), vertex.getUser(), jobToken,
fragmentInfo.getQueryInfo().getQueryIdentifier(), taskSpec.getTaskAttemptID());
}
@@ -321,15 +325,15 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
return fragmentInfo.canFinish();
}
- private Multimap<String, String> createStartedInputMap(FragmentSpecProto fragmentSpec) {
+ private static Multimap<String, String> createStartedInputMap(SignableVertexSpec vertex) {
Multimap<String, String> startedInputMap = HashMultimap.create();
// Let the Processor control start for Broadcast inputs.
// TODO For now, this affects non broadcast unsorted cases as well. Make use of the edge
// property when it's available.
- for (IOSpecProto inputSpec : fragmentSpec.getInputSpecsList()) {
+ for (IOSpecProto inputSpec : vertex.getInputSpecsList()) {
if (inputSpec.getIoDescriptor().getClassName().equals(UnorderedKVInput.class.getName())) {
- startedInputMap.put(fragmentSpec.getVertexName(), inputSpec.getConnectedVertexName());
+ startedInputMap.put(vertex.getVertexName(), inputSpec.getConnectedVertexName());
}
}
return startedInputMap;
@@ -350,7 +354,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
@Override
public String toString() {
return requestId + " {canFinish: " + canFinish() +
- ", vertexParallelism: " + request.getFragmentSpec().getVertexParallelism() +
+ ", vertexParallelism: " + vertex.getVertexParallelism() +
", selfAndUpstreamParallelism: " + request.getFragmentRuntimeInfo().getNumSelfAndUpstreamTasks() +
", selfAndUpstreamComplete: " + request.getFragmentRuntimeInfo().getNumSelfAndUpstreamCompletedTasks() +
", firstAttemptStartTime: " + getFragmentRuntimeInfo().getFirstAttemptStartTime() +
@@ -454,14 +458,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
fragmentCompletionHanler.fragmentComplete(fragmentInfo);
taskRunnerCallable.shutdown();
- HistoryLogger
- .logFragmentEnd(request.getApplicationIdString(), request.getContainerIdString(),
- executionContext.getHostName(), request.getFragmentSpec().getDagName(),
- fragmentInfo.getQueryInfo().getDagIdentifier(),
- request.getFragmentSpec().getVertexName(),
- request.getFragmentSpec().getFragmentNumber(),
- request.getFragmentSpec().getAttemptNumber(), taskRunnerCallable.threadName,
- taskRunnerCallable.startTime, true);
+ logFragmentEnd(true);
}
@Override
@@ -471,14 +468,15 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
fragmentCompletionHanler.fragmentComplete(fragmentInfo);
// TODO HIVE-10236 Report a fatal error over the umbilical
taskRunnerCallable.shutdown();
- HistoryLogger
- .logFragmentEnd(request.getApplicationIdString(), request.getContainerIdString(),
- executionContext.getHostName(), request.getFragmentSpec().getDagName(),
- fragmentInfo.getQueryInfo().getDagIdentifier(),
- request.getFragmentSpec().getVertexName(),
- request.getFragmentSpec().getFragmentNumber(),
- request.getFragmentSpec().getAttemptNumber(), taskRunnerCallable.threadName,
- taskRunnerCallable.startTime, false);
+ logFragmentEnd(false);
+ }
+
+ protected void logFragmentEnd(boolean success) {
+ HistoryLogger.logFragmentEnd(vertex.getVertexIdentifier().getApplicationIdString(),
+ request.getContainerIdString(), executionContext.getHostName(), vertex.getDagName(),
+ fragmentInfo.getQueryInfo().getDagIdentifier(), vertex.getVertexName(),
+ request.getFragmentNumber(), request.getAttemptNumber(), taskRunnerCallable.threadName,
+ taskRunnerCallable.startTime, success);
}
}
@@ -498,12 +496,14 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
public static String getTaskIdentifierString(
SubmitWorkRequestProto request) {
StringBuilder sb = new StringBuilder();
- sb.append("AppId=").append(request.getApplicationIdString())
+ // TODO: also support the binary version
+ SignableVertexSpec vertex = request.getWorkSpec().getVertex();
+ sb.append("AppId=").append(vertex.getVertexIdentifier().getApplicationIdString())
.append(", containerId=").append(request.getContainerIdString())
- .append(", Dag=").append(request.getFragmentSpec().getDagName())
- .append(", Vertex=").append(request.getFragmentSpec().getVertexName())
- .append(", FragmentNum=").append(request.getFragmentSpec().getFragmentNumber())
- .append(", Attempt=").append(request.getFragmentSpec().getAttemptNumber());
+ .append(", Dag=").append(vertex.getDagName())
+ .append(", Vertex=").append(vertex.getVertexName())
+ .append(", FragmentNum=").append(request.getFragmentNumber())
+ .append(", Attempt=").append(request.getAttemptNumber());
return sb.toString();
}
@@ -511,7 +511,8 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
return request.getFragmentRuntimeInfo();
}
- public FragmentSpecProto getFragmentSpec() {
- return request.getFragmentSpec();
+ public SignableVertexSpec getVertexSpec() {
+ // TODO: support for binary spec? presumably we'd parse it somewhere earlier
+ return vertex;
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/0b5c27fd/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
index c6ba14e..d699f20 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
@@ -26,9 +26,11 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler;
import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.VertexOrBinary;
import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics;
+import org.apache.hadoop.hive.llap.tez.Converters;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.tez.dag.records.TezDAGID;
@@ -51,26 +53,25 @@ public class TaskExecutorTestHelpers {
SubmitWorkRequestProto
requestProto = createSubmitWorkRequestProto(fragmentNum, parallelism,
startTime);
- QueryFragmentInfo queryFragmentInfo = createQueryFragmentInfo(requestProto.getFragmentSpec());
+ QueryFragmentInfo queryFragmentInfo = createQueryFragmentInfo(
+ requestProto.getWorkSpec().getVertex(), requestProto.getFragmentNumber());
MockRequest mockRequest = new MockRequest(requestProto, queryFragmentInfo, canFinish, workTime);
return mockRequest;
}
public static TaskExecutorService.TaskWrapper createTaskWrapper(
SubmitWorkRequestProto request, boolean canFinish, int workTime) {
- QueryFragmentInfo queryFragmentInfo = createQueryFragmentInfo(request.getFragmentSpec());
+ QueryFragmentInfo queryFragmentInfo = createQueryFragmentInfo(
+ request.getWorkSpec().getVertex(), request.getFragmentNumber());
MockRequest mockRequest = new MockRequest(request, queryFragmentInfo, canFinish, workTime);
TaskExecutorService.TaskWrapper
taskWrapper = new TaskExecutorService.TaskWrapper(mockRequest, null);
return taskWrapper;
}
- public static QueryFragmentInfo createQueryFragmentInfo(FragmentSpecProto fragmentSpecProto) {
- QueryInfo queryInfo = createQueryInfo();
- QueryFragmentInfo fragmentInfo =
- new QueryFragmentInfo(queryInfo, "fakeVertexName", fragmentSpecProto.getFragmentNumber(), 0,
- fragmentSpecProto);
- return fragmentInfo;
+ public static QueryFragmentInfo createQueryFragmentInfo(
+ SignableVertexSpec vertex, int fragmentNum) {
+ return new QueryFragmentInfo(createQueryInfo(), "fakeVertexName", fragmentNum, 0, vertex, "");
}
public static QueryInfo createQueryInfo() {
@@ -100,20 +101,23 @@ public class TaskExecutorTestHelpers {
TezTaskAttemptID taId = TezTaskAttemptID.getInstance(tId, fragmentNumber);
return SubmitWorkRequestProto
.newBuilder()
- .setFragmentSpec(
- FragmentSpecProto
- .newBuilder()
- .setAttemptNumber(0)
+ .setAttemptNumber(0)
+ .setFragmentNumber(fragmentNumber)
+ .setWorkSpec(
+ VertexOrBinary.newBuilder().setVertex(
+ SignableVertexSpec.newBuilder()
.setDagName("MockDag")
- .setFragmentNumber(fragmentNumber)
+ .setUser("MockUser")
+ .setTokenIdentifier("MockToken_1")
+ .setVertexIdentifier(Converters.createVertexIdentifier(taId, 0))
.setVertexName("MockVertex")
.setProcessorDescriptor(
LlapDaemonProtocolProtos.EntityDescriptorProto.newBuilder()
.setClassName("MockProcessor").build())
- .setFragmentIdentifierString(taId.toString()).build()).setAmHost("localhost")
- .setAmPort(12345).setAppAttemptNumber(0).setApplicationIdString("MockApp_1")
- .setContainerIdString("MockContainer_1").setUser("MockUser")
- .setTokenIdentifier("MockToken_1")
+ .build()).build())
+ .setAmHost("localhost")
+ .setAmPort(12345)
+ .setContainerIdString("MockContainer_1")
.setFragmentRuntimeInfo(LlapDaemonProtocolProtos
.FragmentRuntimeInfo
.newBuilder()
@@ -146,7 +150,7 @@ public class TaskExecutorTestHelpers {
new ExecutionContextImpl("localhost"), null, new Credentials(), 0, null, null, mock(
LlapDaemonExecutorMetrics.class),
mock(KilledTaskHandler.class), mock(
- FragmentCompletionHandler.class), new DefaultHadoopShim());
+ FragmentCompletionHandler.class), new DefaultHadoopShim(), null);
this.workTime = workTime;
this.canFinish = canFinish;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/0b5c27fd/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java
index 08ee769..a250882 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java
@@ -31,8 +31,11 @@ import org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorService.TaskWrapper;
import org.apache.hadoop.hive.llap.daemon.impl.TaskRunnerCallable;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.EntityDescriptorProto;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.VertexIdentifier;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.VertexOrBinary;
+import org.apache.hadoop.hive.llap.tez.Converters;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.tez.dag.records.TezDAGID;
@@ -59,7 +62,7 @@ public class TestFirstInFirstOutComparator {
super(requestProto, mock(QueryFragmentInfo.class), conf,
new ExecutionContextImpl("localhost"), null, cred, 0, null, null, null,
mock(KilledTaskHandler.class), mock(
- FragmentCompletionHandler.class), new DefaultHadoopShim());
+ FragmentCompletionHandler.class), new DefaultHadoopShim(), null);
this.workTime = workTime;
this.canFinish = canFinish;
}
@@ -102,19 +105,23 @@ public class TestFirstInFirstOutComparator {
TezTaskAttemptID taId = TezTaskAttemptID.getInstance(tId, fragmentNumber);
return SubmitWorkRequestProto
.newBuilder()
- .setFragmentSpec(
- FragmentSpecProto
+ .setAttemptNumber(0)
+ .setFragmentNumber(fragmentNumber)
+ .setWorkSpec(
+ VertexOrBinary.newBuilder().setVertex(
+ SignableVertexSpec
.newBuilder()
- .setAttemptNumber(0)
+ .setVertexIdentifier(Converters.createVertexIdentifier(taId, 0))
.setDagName("MockDag")
- .setFragmentNumber(fragmentNumber)
.setVertexName("MockVertex")
+ .setUser("MockUser")
+ .setTokenIdentifier("MockToken_1")
.setProcessorDescriptor(
EntityDescriptorProto.newBuilder().setClassName("MockProcessor").build())
- .setFragmentIdentifierString(taId.toString()).build()).setAmHost("localhost")
- .setAmPort(12345).setAppAttemptNumber(0).setApplicationIdString("MockApp_1")
- .setContainerIdString("MockContainer_1").setUser("MockUser")
- .setTokenIdentifier("MockToken_1")
+ .build()).build())
+ .setAmHost("localhost")
+ .setAmPort(12345)
+ .setContainerIdString("MockContainer_1")
.setFragmentRuntimeInfo(LlapDaemonProtocolProtos
.FragmentRuntimeInfo
.newBuilder()
http://git-wip-us.apache.org/repos/asf/hive/blob/0b5c27fd/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
index b4b041a..a3f2eb8 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWor
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentResponseProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.VertexOrBinary;
import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol;
import org.apache.hadoop.hive.llap.registry.ServiceInstance;
import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
@@ -89,10 +90,7 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
private static final Logger LOG = LoggerFactory.getLogger(LlapTaskCommunicator.class);
private static final boolean isInfoEnabled = LOG.isInfoEnabled();
- private static final boolean isDebugEnabed = LOG.isDebugEnabled();
-
- private final SubmitWorkRequestProto BASE_SUBMIT_WORK_REQUEST;
-
+
private final ConcurrentMap<QueryIdentifierProto, ByteBuffer> credentialMap;
// Tracks containerIds and taskAttemptIds, so can be kept independent of the running DAG.
@@ -105,6 +103,8 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
private long deleteDelayOnDagComplete;
private final LlapTaskUmbilicalProtocol umbilical;
private final Token<LlapTokenIdentifier> token;
+ private final int appAttemptId;
+ private final String user;
// These two structures track the list of known nodes, and the list of nodes which are sending in keep-alive heartbeats.
// Primarily for debugging purposes a.t.m, since there's some unexplained TASK_TIMEOUTS which are currently being observed.
@@ -113,8 +113,6 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
private final LlapRegistryService serviceRegistry;
-
- private volatile int currentDagId;
private volatile QueryIdentifierProto currentQueryIdentifierProto;
public LlapTaskCommunicator(
@@ -138,17 +136,10 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
serviceRegistry = LlapRegistryService.getClient(conf);
umbilical = new LlapTaskUmbilicalProtocolImpl(getUmbilical());
- SubmitWorkRequestProto.Builder baseBuilder = SubmitWorkRequestProto.newBuilder();
// TODO Avoid reading this from the environment
- baseBuilder.setUser(System.getenv(ApplicationConstants.Environment.USER.name()));
- baseBuilder.setApplicationIdString(
- taskCommunicatorContext.getApplicationAttemptId().getApplicationId().toString());
- baseBuilder
- .setAppAttemptNumber(taskCommunicatorContext.getApplicationAttemptId().getAttemptId());
- baseBuilder.setTokenIdentifier(getTokenIdentifier());
-
- BASE_SUBMIT_WORK_REQUEST = baseBuilder.build();
+ user = System.getenv(ApplicationConstants.Environment.USER.name());
+ appAttemptId = taskCommunicatorContext.getApplicationAttemptId().getAttemptId();
credentialMap = new ConcurrentHashMap<>();
sourceStateTracker = new SourceStateTracker(getContext(), this);
@@ -316,7 +307,6 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
t = se.getCause();
}
if (t instanceof RemoteException) {
- RemoteException re = (RemoteException) t;
// All others from the remote service cause the task to FAIL.
LOG.info(
"Failed to run task: " + taskSpec.getTaskAttemptID() + " on containerId: " +
@@ -591,8 +581,9 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
TaskSpec taskSpec,
FragmentRuntimeInfo fragmentRuntimeInfo) throws
IOException {
- SubmitWorkRequestProto.Builder builder =
- SubmitWorkRequestProto.newBuilder(BASE_SUBMIT_WORK_REQUEST);
+ SubmitWorkRequestProto.Builder builder = SubmitWorkRequestProto.newBuilder();
+ builder.setFragmentNumber(taskSpec.getTaskAttemptID().getTaskID().getId());
+ builder.setAttemptNumber(taskSpec.getTaskAttemptID().getId());
builder.setContainerIdString(containerId.toString());
builder.setAmHost(getAddress().getHostName());
builder.setAmPort(getAddress().getPort());
@@ -607,7 +598,9 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
credentialsBinary = credentialsBinary.duplicate();
}
builder.setCredentialsBinary(ByteString.copyFrom(credentialsBinary));
- builder.setFragmentSpec(Converters.convertTaskSpecToProto(taskSpec));
+ builder.setWorkSpec(VertexOrBinary.newBuilder().setVertex(Converters.convertTaskSpecToProto(
+ taskSpec, appAttemptId, getTokenIdentifier(), null, user)).build());
+ // Don't call builder.setWorkSpecSignature() - Tez doesn't sign fragments
builder.setFragmentRuntimeInfo(fragmentRuntimeInfo);
return builder.build();
}